diff options
author | Aaron Davidson <aaron@databricks.com> | 2013-10-21 22:10:35 -0700 |
---|---|---|
committer | Aaron Davidson <aaron@databricks.com> | 2013-10-21 22:17:53 -0700 |
commit | 053ef949ace4fa5581e86d71c5a8162ff5e376a4 (patch) | |
tree | bc0293e1bf603b8611590a80ad8f03bcdd4cee36 /core | |
parent | 4aa0ba1df7336ab5066be58c208e8b1eb69864df (diff) | |
parent | a51359c917a9ebe379b32ebc53fd093c454ea195 (diff) | |
download | spark-053ef949ace4fa5581e86d71c5a8162ff5e376a4.tar.gz spark-053ef949ace4fa5581e86d71c5a8162ff5e376a4.tar.bz2 spark-053ef949ace4fa5581e86d71c5a8162ff5e376a4.zip |
Merge ShufflePerfTester patch into shuffle block consolidation
Diffstat (limited to 'core')
16 files changed, 491 insertions, 446 deletions
diff --git a/core/src/main/scala/org/apache/hadoop/mapred/SparkHadoopMapRedUtil.scala b/core/src/main/scala/org/apache/hadoop/mapred/SparkHadoopMapRedUtil.scala index f87460039b..0c47afae54 100644 --- a/core/src/main/scala/org/apache/hadoop/mapred/SparkHadoopMapRedUtil.scala +++ b/core/src/main/scala/org/apache/hadoop/mapred/SparkHadoopMapRedUtil.scala @@ -17,20 +17,29 @@ package org.apache.hadoop.mapred +private[apache] trait SparkHadoopMapRedUtil { def newJobContext(conf: JobConf, jobId: JobID): JobContext = { - val klass = firstAvailableClass("org.apache.hadoop.mapred.JobContextImpl", "org.apache.hadoop.mapred.JobContext"); - val ctor = klass.getDeclaredConstructor(classOf[JobConf], classOf[org.apache.hadoop.mapreduce.JobID]) + val klass = firstAvailableClass("org.apache.hadoop.mapred.JobContextImpl", + "org.apache.hadoop.mapred.JobContext") + val ctor = klass.getDeclaredConstructor(classOf[JobConf], + classOf[org.apache.hadoop.mapreduce.JobID]) ctor.newInstance(conf, jobId).asInstanceOf[JobContext] } def newTaskAttemptContext(conf: JobConf, attemptId: TaskAttemptID): TaskAttemptContext = { - val klass = firstAvailableClass("org.apache.hadoop.mapred.TaskAttemptContextImpl", "org.apache.hadoop.mapred.TaskAttemptContext") + val klass = firstAvailableClass("org.apache.hadoop.mapred.TaskAttemptContextImpl", + "org.apache.hadoop.mapred.TaskAttemptContext") val ctor = klass.getDeclaredConstructor(classOf[JobConf], classOf[TaskAttemptID]) ctor.newInstance(conf, attemptId).asInstanceOf[TaskAttemptContext] } - def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = { + def newTaskAttemptID( + jtIdentifier: String, + jobId: Int, + isMap: Boolean, + taskId: Int, + attemptId: Int) = { new TaskAttemptID(jtIdentifier, jobId, isMap, taskId, attemptId) } diff --git a/core/src/main/scala/org/apache/hadoop/mapreduce/SparkHadoopMapReduceUtil.scala b/core/src/main/scala/org/apache/hadoop/mapreduce/SparkHadoopMapReduceUtil.scala index 93180307fa..32429f01ac 100644 --- a/core/src/main/scala/org/apache/hadoop/mapreduce/SparkHadoopMapReduceUtil.scala +++ b/core/src/main/scala/org/apache/hadoop/mapreduce/SparkHadoopMapReduceUtil.scala @@ -17,9 +17,10 @@ package org.apache.hadoop.mapreduce -import org.apache.hadoop.conf.Configuration import java.lang.{Integer => JInteger, Boolean => JBoolean} +import org.apache.hadoop.conf.Configuration +private[apache] trait SparkHadoopMapReduceUtil { def newJobContext(conf: Configuration, jobId: JobID): JobContext = { val klass = firstAvailableClass( @@ -37,23 +38,31 @@ trait SparkHadoopMapReduceUtil { ctor.newInstance(conf, attemptId).asInstanceOf[TaskAttemptContext] } - def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = { - val klass = Class.forName("org.apache.hadoop.mapreduce.TaskAttemptID"); + def newTaskAttemptID( + jtIdentifier: String, + jobId: Int, + isMap: Boolean, + taskId: Int, + attemptId: Int) = { + val klass = Class.forName("org.apache.hadoop.mapreduce.TaskAttemptID") try { - // first, attempt to use the old-style constructor that takes a boolean isMap (not available in YARN) + // First, attempt to use the old-style constructor that takes a boolean isMap + // (not available in YARN) val ctor = klass.getDeclaredConstructor(classOf[String], classOf[Int], classOf[Boolean], - classOf[Int], classOf[Int]) - ctor.newInstance(jtIdentifier, new JInteger(jobId), new JBoolean(isMap), new JInteger(taskId), new - JInteger(attemptId)).asInstanceOf[TaskAttemptID] + classOf[Int], classOf[Int]) + ctor.newInstance(jtIdentifier, new JInteger(jobId), new JBoolean(isMap), new JInteger(taskId), + new JInteger(attemptId)).asInstanceOf[TaskAttemptID] } catch { case exc: NoSuchMethodException => { - // failed, look for the new ctor that takes a TaskType (not available in 1.x) - val taskTypeClass = Class.forName("org.apache.hadoop.mapreduce.TaskType").asInstanceOf[Class[Enum[_]]] - val taskType = taskTypeClass.getMethod("valueOf", classOf[String]).invoke(taskTypeClass, if(isMap) "MAP" else "REDUCE") + // If that failed, look for the new constructor that takes a TaskType (not available in 1.x) + val taskTypeClass = Class.forName("org.apache.hadoop.mapreduce.TaskType") + .asInstanceOf[Class[Enum[_]]] + val taskType = taskTypeClass.getMethod("valueOf", classOf[String]).invoke( + taskTypeClass, if(isMap) "MAP" else "REDUCE") val ctor = klass.getDeclaredConstructor(classOf[String], classOf[Int], taskTypeClass, classOf[Int], classOf[Int]) - ctor.newInstance(jtIdentifier, new JInteger(jobId), taskType, new JInteger(taskId), new - JInteger(attemptId)).asInstanceOf[TaskAttemptID] + ctor.newInstance(jtIdentifier, new JInteger(jobId), taskType, new JInteger(taskId), + new JInteger(attemptId)).asInstanceOf[TaskAttemptID] } } } diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 48bbc78795..564466cfd5 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -51,25 +51,20 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFor import org.apache.mesos.MesosNativeLibrary -import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.LocalSparkCluster import org.apache.spark.partial.{ApproximateEvaluator, PartialResult} import org.apache.spark.rdd._ import org.apache.spark.scheduler._ -import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SparkDeploySchedulerBackend, - ClusterScheduler} -import org.apache.spark.scheduler.local.LocalScheduler +import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, + SparkDeploySchedulerBackend, ClusterScheduler} import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend} -import org.apache.spark.storage.{StorageUtils, BlockManagerSource} +import org.apache.spark.scheduler.local.LocalScheduler +import org.apache.spark.storage.{BlockManagerSource, RDDInfo, StorageStatus, StorageUtils} import org.apache.spark.ui.SparkUI -import org.apache.spark.util._ -import org.apache.spark.scheduler.StageInfo -import org.apache.spark.storage.RDDInfo -import org.apache.spark.storage.StorageStatus -import scala.Some -import org.apache.spark.scheduler.StageInfo -import org.apache.spark.storage.RDDInfo -import org.apache.spark.storage.StorageStatus +import org.apache.spark.util.{ClosureCleaner, MetadataCleaner, MetadataCleanerType, + TimeStampedHashMap, Utils} + + /** * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark @@ -125,7 +120,7 @@ class SparkContext( private[spark] val persistentRdds = new TimeStampedHashMap[Int, RDD[_]] private[spark] val metadataCleaner = new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup) - // Initalize the Spark UI + // Initialize the Spark UI private[spark] val ui = new SparkUI(this) ui.bind() @@ -161,8 +156,8 @@ class SparkContext( val LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r // Regular expression for connecting to Spark deploy clusters val SPARK_REGEX = """spark://(.*)""".r - //Regular expression for connection to Mesos cluster - val MESOS_REGEX = """(mesos://.*)""".r + // Regular expression for connection to Mesos cluster + val MESOS_REGEX = """mesos://(.*)""".r master match { case "local" => @@ -292,11 +287,31 @@ class SparkContext( setJobGroup("", value) } + /** + * Assigns a group id to all the jobs started by this thread until the group id is set to a + * different value or cleared. + * + * Often, a unit of execution in an application consists of multiple Spark actions or jobs. + * Application programmers can use this method to group all those jobs together and give a + * group description. Once set, the Spark web UI will associate such jobs with this group. + * + * The application can also use [[org.apache.spark.SparkContext.cancelJobGroup]] to cancel all + * running jobs in this group. For example, + * {{{ + * // In the main thread: + * sc.setJobGroup("some_job_to_cancel", "some job description") + * sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.count() + * + * // In a separate thread: + * sc.cancelJobGroup("some_job_to_cancel") + * }}} + */ def setJobGroup(groupId: String, description: String) { setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, description) setLocalProperty(SparkContext.SPARK_JOB_GROUP_ID, groupId) } + /** Clear the job group id and its description. */ def clearJobGroup() { setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, null) setLocalProperty(SparkContext.SPARK_JOB_GROUP_ID, null) @@ -305,8 +320,8 @@ class SparkContext( // Post init taskScheduler.postStartHook() - val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler, this) - val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager, this) + private val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler, this) + private val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager, this) def initDriverMetrics() { SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource) @@ -881,13 +896,15 @@ class SparkContext( new SimpleFutureAction(waiter, resultFunc) } + /** + * Cancel active jobs for the specified group. See [[org.apache.spark.SparkContext.setJobGroup]] + * for more information. + */ def cancelJobGroup(groupId: String) { dagScheduler.cancelJobGroup(groupId) } - /** - * Cancel all jobs that have been scheduled or are running. - */ + /** Cancel all jobs that have been scheduled or are running. */ def cancelAllJobs() { dagScheduler.cancelAllJobs() } @@ -949,9 +966,9 @@ class SparkContext( */ object SparkContext { - val SPARK_JOB_DESCRIPTION = "spark.job.description" + private[spark] val SPARK_JOB_DESCRIPTION = "spark.job.description" - val SPARK_JOB_GROUP_ID = "spark.jobGroup.id" + private[spark] val SPARK_JOB_GROUP_ID = "spark.jobGroup.id" implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] { def addInPlace(t1: Double, t2: Double): Double = t1 + t2 diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala index afa76a4a76..103a1c2051 100644 --- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala +++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala @@ -17,14 +17,14 @@ package org.apache.hadoop.mapred -import org.apache.hadoop.fs.FileSystem -import org.apache.hadoop.fs.Path - +import java.io.IOException import java.text.SimpleDateFormat import java.text.NumberFormat -import java.io.IOException import java.util.Date +import org.apache.hadoop.fs.FileSystem +import org.apache.hadoop.fs.Path + import org.apache.spark.Logging import org.apache.spark.SerializableWritable @@ -36,6 +36,7 @@ import org.apache.spark.SerializableWritable * Saves the RDD using a JobConf, which should contain an output key class, an output value class, * a filename to write to, etc, exactly like in a Hadoop MapReduce job. */ +private[apache] class SparkHadoopWriter(@transient jobConf: JobConf) extends Logging with SparkHadoopMapRedUtil @@ -86,13 +87,11 @@ class SparkHadoopWriter(@transient jobConf: JobConf) } getOutputCommitter().setupTask(getTaskContext()) - writer = getOutputFormat().getRecordWriter( - fs, conf.value, outputName, Reporter.NULL) + writer = getOutputFormat().getRecordWriter(fs, conf.value, outputName, Reporter.NULL) } def write(key: AnyRef, value: AnyRef) { - if (writer!=null) { - //println (">>> Writing ("+key.toString+": " + key.getClass.toString + ", " + value.toString + ": " + value.getClass.toString + ")") + if (writer != null) { writer.write(key, value) } else { throw new IOException("Writer is null, open() has not been called") @@ -182,6 +181,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf) } } +private[apache] object SparkHadoopWriter { def createJobID(time: Date, id: Int): JobID = { val formatter = new SimpleDateFormat("yyyyMMddHHmm") diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 1f8ad688a6..12b4d94a56 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -308,7 +308,7 @@ private class BytesToString extends org.apache.spark.api.java.function.Function[ * Internal class that acts as an `AccumulatorParam` for Python accumulators. Inside, it * collects a list of pickled strings that we pass to Python through a socket. */ -class PythonAccumulatorParam(@transient serverHost: String, serverPort: Int) +private class PythonAccumulatorParam(@transient serverHost: String, serverPort: Int) extends AccumulatorParam[JList[Array[Byte]]] { Utils.checkHost(serverHost, "Expected hostname") diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 993ba6bd3d..83cd3df5fa 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -17,26 +17,31 @@ package org.apache.spark.deploy -import com.google.common.collect.MapMaker - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapred.JobConf +import com.google.common.collect.MapMaker + /** - * Contains util methods to interact with Hadoop from spark. + * Contains util methods to interact with Hadoop from Spark. */ +private[spark] class SparkHadoopUtil { // A general, soft-reference map for metadata needed during HadoopRDD split computation // (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats). private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]() - // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop - // subsystems + /** + * Return an appropriate (subclass) of Configuration. Creating config can initializes some Hadoop + * subsystems. + */ def newConfiguration(): Configuration = new Configuration() - // Add any user credentials to the job conf which are necessary for running on a secure Hadoop - // cluster + /** + * Add any user credentials to the job conf which are necessary for running on a secure Hadoop + * cluster. + */ def addCredentials(conf: JobConf) {} def isYarnMode(): Boolean = { false } diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 032eb04f43..b773346df3 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -74,30 +74,33 @@ private[spark] class Executor( private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader) Thread.currentThread.setContextClassLoader(replClassLoader) - // Make any thread terminations due to uncaught exceptions kill the entire - // executor process to avoid surprising stalls. - Thread.setDefaultUncaughtExceptionHandler( - new Thread.UncaughtExceptionHandler { - override def uncaughtException(thread: Thread, exception: Throwable) { - try { - logError("Uncaught exception in thread " + thread, exception) - - // We may have been called from a shutdown hook. If so, we must not call System.exit(). - // (If we do, we will deadlock.) - if (!Utils.inShutdown()) { - if (exception.isInstanceOf[OutOfMemoryError]) { - System.exit(ExecutorExitCode.OOM) - } else { - System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION) + if (!isLocal) { + // Setup an uncaught exception handler for non-local mode. + // Make any thread terminations due to uncaught exceptions kill the entire + // executor process to avoid surprising stalls. + Thread.setDefaultUncaughtExceptionHandler( + new Thread.UncaughtExceptionHandler { + override def uncaughtException(thread: Thread, exception: Throwable) { + try { + logError("Uncaught exception in thread " + thread, exception) + + // We may have been called from a shutdown hook. If so, we must not call System.exit(). + // (If we do, we will deadlock.) + if (!Utils.inShutdown()) { + if (exception.isInstanceOf[OutOfMemoryError]) { + System.exit(ExecutorExitCode.OOM) + } else { + System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION) + } } + } catch { + case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM) + case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE) } - } catch { - case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM) - case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE) } } - } - ) + ) + } val executorSource = new ExecutorSource(this, executorId) diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index f311141148..0b4892f98f 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -102,4 +102,9 @@ class ShuffleWriteMetrics extends Serializable { * Number of bytes written for a shuffle */ var shuffleBytesWritten: Long = _ + + /** + * Time spent blocking on writes to disk or buffer cache, in nanoseconds. + */ + var shuffleWriteTime: Long = _ } diff --git a/core/src/main/scala/org/apache/spark/package.scala b/core/src/main/scala/org/apache/spark/package.scala index f132e2b735..70a5a8caff 100644 --- a/core/src/main/scala/org/apache/spark/package.scala +++ b/core/src/main/scala/org/apache/spark/package.scala @@ -15,6 +15,8 @@ * limitations under the License. */ +package org.apache + /** * Core Spark functionality. [[org.apache.spark.SparkContext]] serves as the main entry point to * Spark, while [[org.apache.spark.rdd.RDD]] is the data type representing a distributed collection, diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala index 3628b1b078..19c0251690 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala @@ -24,56 +24,54 @@ import java.text.SimpleDateFormat import java.util.{Date, Properties}
import java.util.concurrent.LinkedBlockingQueue
-import scala.collection.mutable.{Map, HashMap, ListBuffer}
-import scala.io.Source
+import scala.collection.mutable.{HashMap, ListBuffer}
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.executor.TaskMetrics
-// Used to record runtime information for each job, including RDD graph
-// tasks' start/stop shuffle information and information from outside
-
+/**
+ * A logger class to record runtime information for jobs in Spark. This class outputs one log file
+ * per Spark job with information such as RDD graph, tasks start/stop, shuffle information.
+ *
+ * @param logDirName The base directory for the log files.
+ */
class JobLogger(val logDirName: String) extends SparkListener with Logging {
- private val logDir =
- if (System.getenv("SPARK_LOG_DIR") != null)
- System.getenv("SPARK_LOG_DIR")
- else
- "/tmp/spark"
+
+ private val logDir = Option(System.getenv("SPARK_LOG_DIR")).getOrElse("/tmp/spark")
+
private val jobIDToPrintWriter = new HashMap[Int, PrintWriter]
private val stageIDToJobID = new HashMap[Int, Int]
private val jobIDToStages = new HashMap[Int, ListBuffer[Stage]]
private val DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
private val eventQueue = new LinkedBlockingQueue[SparkListenerEvents]
-
+
createLogDir()
def this() = this(String.valueOf(System.currentTimeMillis()))
-
- def getLogDir = logDir
- def getJobIDtoPrintWriter = jobIDToPrintWriter
- def getStageIDToJobID = stageIDToJobID
- def getJobIDToStages = jobIDToStages
- def getEventQueue = eventQueue
-
+
+ // The following 5 functions are used only in testing.
+ private[scheduler] def getLogDir = logDir
+ private[scheduler] def getJobIDtoPrintWriter = jobIDToPrintWriter
+ private[scheduler] def getStageIDToJobID = stageIDToJobID
+ private[scheduler] def getJobIDToStages = jobIDToStages
+ private[scheduler] def getEventQueue = eventQueue
+
// Create a folder for log files, the folder's name is the creation time of the jobLogger
protected def createLogDir() {
val dir = new File(logDir + "/" + logDirName + "/")
- if (dir.exists()) {
- return
- }
- if (dir.mkdirs() == false) {
- logError("create log directory error:" + logDir + "/" + logDirName + "/")
+ if (!dir.exists() && !dir.mkdirs()) {
+ logError("Error creating log directory: " + logDir + "/" + logDirName + "/")
}
}
// Create a log file for one job, the file name is the jobID
protected def createLogWriter(jobID: Int) {
- try{
+ try {
val fileWriter = new PrintWriter(logDir + "/" + logDirName + "/" + jobID)
jobIDToPrintWriter += (jobID -> fileWriter)
- } catch {
- case e: FileNotFoundException => e.printStackTrace()
- }
+ } catch {
+ case e: FileNotFoundException => e.printStackTrace()
+ }
}
// Close log file, and clean the stage relationship in stageIDToJobID
@@ -118,10 +116,9 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { def getRddsInStage(rdd: RDD[_]): ListBuffer[RDD[_]] = {
var rddList = new ListBuffer[RDD[_]]
rddList += rdd
- rdd.dependencies.foreach{ dep => dep match {
- case shufDep: ShuffleDependency[_,_] =>
- case _ => rddList ++= getRddsInStage(dep.rdd)
- }
+ rdd.dependencies.foreach {
+ case shufDep: ShuffleDependency[_, _] =>
+ case dep: Dependency[_] => rddList ++= getRddsInStage(dep.rdd)
}
rddList
}
@@ -161,29 +158,27 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { protected def recordRddInStageGraph(jobID: Int, rdd: RDD[_], indent: Int) {
val rddInfo = "RDD_ID=" + rdd.id + "(" + getRddName(rdd) + "," + rdd.generator + ")"
jobLogInfo(jobID, indentString(indent) + rddInfo, false)
- rdd.dependencies.foreach{ dep => dep match {
- case shufDep: ShuffleDependency[_,_] =>
- val depInfo = "SHUFFLE_ID=" + shufDep.shuffleId
- jobLogInfo(jobID, indentString(indent + 1) + depInfo, false)
- case _ => recordRddInStageGraph(jobID, dep.rdd, indent + 1)
- }
+ rdd.dependencies.foreach {
+ case shufDep: ShuffleDependency[_, _] =>
+ val depInfo = "SHUFFLE_ID=" + shufDep.shuffleId
+ jobLogInfo(jobID, indentString(indent + 1) + depInfo, false)
+ case dep: Dependency[_] => recordRddInStageGraph(jobID, dep.rdd, indent + 1)
}
}
protected def recordStageDepGraph(jobID: Int, stage: Stage, indent: Int = 0) {
- var stageInfo: String = ""
- if (stage.isShuffleMap) {
- stageInfo = "STAGE_ID=" + stage.id + " MAP_STAGE SHUFFLE_ID=" +
- stage.shuffleDep.get.shuffleId
- }else{
- stageInfo = "STAGE_ID=" + stage.id + " RESULT_STAGE"
+ val stageInfo = if (stage.isShuffleMap) {
+ "STAGE_ID=" + stage.id + " MAP_STAGE SHUFFLE_ID=" + stage.shuffleDep.get.shuffleId
+ } else {
+ "STAGE_ID=" + stage.id + " RESULT_STAGE"
}
if (stage.jobId == jobID) {
jobLogInfo(jobID, indentString(indent) + stageInfo, false)
recordRddInStageGraph(jobID, stage.rdd, indent)
stage.parents.foreach(recordStageDepGraph(jobID, _, indent + 2))
- } else
+ } else {
jobLogInfo(jobID, indentString(indent) + stageInfo + " JOB_ID=" + stage.jobId, false)
+ }
}
// Record task metrics into job log files
@@ -193,39 +188,32 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { " START_TIME=" + taskInfo.launchTime + " FINISH_TIME=" + taskInfo.finishTime +
" EXECUTOR_ID=" + taskInfo.executorId + " HOST=" + taskMetrics.hostname
val executorRunTime = " EXECUTOR_RUN_TIME=" + taskMetrics.executorRunTime
- val readMetrics =
- taskMetrics.shuffleReadMetrics match {
- case Some(metrics) =>
- " SHUFFLE_FINISH_TIME=" + metrics.shuffleFinishTime +
- " BLOCK_FETCHED_TOTAL=" + metrics.totalBlocksFetched +
- " BLOCK_FETCHED_LOCAL=" + metrics.localBlocksFetched +
- " BLOCK_FETCHED_REMOTE=" + metrics.remoteBlocksFetched +
- " REMOTE_FETCH_WAIT_TIME=" + metrics.fetchWaitTime +
- " REMOTE_FETCH_TIME=" + metrics.remoteFetchTime +
- " REMOTE_BYTES_READ=" + metrics.remoteBytesRead
- case None => ""
- }
- val writeMetrics =
- taskMetrics.shuffleWriteMetrics match {
- case Some(metrics) =>
- " SHUFFLE_BYTES_WRITTEN=" + metrics.shuffleBytesWritten
- case None => ""
- }
+ val readMetrics = taskMetrics.shuffleReadMetrics match {
+ case Some(metrics) =>
+ " SHUFFLE_FINISH_TIME=" + metrics.shuffleFinishTime +
+ " BLOCK_FETCHED_TOTAL=" + metrics.totalBlocksFetched +
+ " BLOCK_FETCHED_LOCAL=" + metrics.localBlocksFetched +
+ " BLOCK_FETCHED_REMOTE=" + metrics.remoteBlocksFetched +
+ " REMOTE_FETCH_WAIT_TIME=" + metrics.fetchWaitTime +
+ " REMOTE_FETCH_TIME=" + metrics.remoteFetchTime +
+ " REMOTE_BYTES_READ=" + metrics.remoteBytesRead
+ case None => ""
+ }
+ val writeMetrics = taskMetrics.shuffleWriteMetrics match {
+ case Some(metrics) => " SHUFFLE_BYTES_WRITTEN=" + metrics.shuffleBytesWritten
+ case None => ""
+ }
stageLogInfo(stageID, status + info + executorRunTime + readMetrics + writeMetrics)
}
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) {
- stageLogInfo(
- stageSubmitted.stage.id,
- "STAGE_ID=%d STATUS=SUBMITTED TASK_SIZE=%d".format(
- stageSubmitted.stage.id, stageSubmitted.taskSize))
+ stageLogInfo(stageSubmitted.stage.id, "STAGE_ID=%d STATUS=SUBMITTED TASK_SIZE=%d".format(
+ stageSubmitted.stage.id, stageSubmitted.taskSize))
}
override def onStageCompleted(stageCompleted: StageCompleted) {
- stageLogInfo(
- stageCompleted.stageInfo.stage.id,
+ stageLogInfo(stageCompleted.stageInfo.stage.id,
"STAGE_ID=%d STATUS=COMPLETED".format(stageCompleted.stageInfo.stage.id))
-
}
override def onTaskStart(taskStart: SparkListenerTaskStart) { }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala index e86852311f..24d97da6eb 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala @@ -164,16 +164,19 @@ private[spark] class ShuffleMapTask( // Commit the writes. Get the size of each bucket block (total block size). var totalBytes = 0L + var totalTime = 0L val compressedSizes: Array[Byte] = buckets.writers.map { writer: BlockObjectWriter => writer.commit() val size = writer.fileSegment().length totalBytes += size + totalTime += writer.timeWriting() MapOutputTracker.compressSize(size) } // Update shuffle metrics. val shuffleMetrics = new ShuffleWriteMetrics shuffleMetrics.shuffleBytesWritten = totalBytes + shuffleMetrics.shuffleWriteTime = totalTime metrics.get.shuffleWriteMetrics = Some(shuffleMetrics) new MapStatus(blockManager.blockManagerId, compressedSizes) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 1f173c7722..e6329cbd47 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -20,7 +20,7 @@ package org.apache.spark.storage import java.io.{InputStream, OutputStream} import java.nio.{ByteBuffer, MappedByteBuffer} -import scala.collection.mutable.{HashMap, ArrayBuffer, HashSet} +import scala.collection.mutable.{HashMap, ArrayBuffer} import scala.util.Random import akka.actor.{ActorSystem, Cancellable, Props} @@ -322,89 +322,14 @@ private[spark] class BlockManager( */ def getLocal(blockId: BlockId): Option[Iterator[Any]] = { logDebug("Getting local block " + blockId) - val info = blockInfo.get(blockId).orNull - if (info != null) { - info.synchronized { - - // In the another thread is writing the block, wait for it to become ready. - if (!info.waitForReady()) { - // If we get here, the block write failed. - logWarning("Block " + blockId + " was marked as failure.") - return None - } - - val level = info.level - logDebug("Level for block " + blockId + " is " + level) - - // Look for the block in memory - if (level.useMemory) { - logDebug("Getting block " + blockId + " from memory") - memoryStore.getValues(blockId) match { - case Some(iterator) => - return Some(iterator) - case None => - logDebug("Block " + blockId + " not found in memory") - } - } - - // Look for block on disk, potentially loading it back into memory if required - if (level.useDisk) { - logDebug("Getting block " + blockId + " from disk") - if (level.useMemory && level.deserialized) { - diskStore.getValues(blockId) match { - case Some(iterator) => - // Put the block back in memory before returning it - // TODO: Consider creating a putValues that also takes in a iterator ? - val elements = new ArrayBuffer[Any] - elements ++= iterator - memoryStore.putValues(blockId, elements, level, true).data match { - case Left(iterator2) => - return Some(iterator2) - case _ => - throw new Exception("Memory store did not return back an iterator") - } - case None => - throw new Exception("Block " + blockId + " not found on disk, though it should be") - } - } else if (level.useMemory && !level.deserialized) { - // Read it as a byte buffer into memory first, then return it - diskStore.getBytes(blockId) match { - case Some(bytes) => - // Put a copy of the block back in memory before returning it. Note that we can't - // put the ByteBuffer returned by the disk store as that's a memory-mapped file. - // The use of rewind assumes this. - assert (0 == bytes.position()) - val copyForMemory = ByteBuffer.allocate(bytes.limit) - copyForMemory.put(bytes) - memoryStore.putBytes(blockId, copyForMemory, level) - bytes.rewind() - return Some(dataDeserialize(blockId, bytes)) - case None => - throw new Exception("Block " + blockId + " not found on disk, though it should be") - } - } else { - diskStore.getValues(blockId) match { - case Some(iterator) => - return Some(iterator) - case None => - throw new Exception("Block " + blockId + " not found on disk, though it should be") - } - } - } - } - } else { - logDebug("Block " + blockId + " not registered locally") - } - return None + doGetLocal(blockId, asValues = true).asInstanceOf[Option[Iterator[Any]]] } /** * Get block from the local block manager as serialized bytes. */ def getLocalBytes(blockId: BlockId): Option[ByteBuffer] = { - // TODO: This whole thing is very similar to getLocal; we need to refactor it somehow logDebug("Getting local block " + blockId + " as bytes") - // As an optimization for map output fetches, if the block is for a shuffle, return it // without acquiring a lock; the disk store never deletes (recent) items so this should work if (blockId.isShuffle) { @@ -415,12 +340,15 @@ private[spark] class BlockManager( throw new Exception("Block " + blockId + " not found on disk, though it should be") } } + doGetLocal(blockId, asValues = false).asInstanceOf[Option[ByteBuffer]] + } + private def doGetLocal(blockId: BlockId, asValues: Boolean): Option[Any] = { val info = blockInfo.get(blockId).orNull if (info != null) { info.synchronized { - // In the another thread is writing the block, wait for it to become ready. + // If another thread is writing the block, wait for it to become ready. if (!info.waitForReady()) { // If we get here, the block write failed. logWarning("Block " + blockId + " was marked as failure.") @@ -433,62 +361,104 @@ private[spark] class BlockManager( // Look for the block in memory if (level.useMemory) { logDebug("Getting block " + blockId + " from memory") - memoryStore.getBytes(blockId) match { - case Some(bytes) => - return Some(bytes) + val result = if (asValues) { + memoryStore.getValues(blockId) + } else { + memoryStore.getBytes(blockId) + } + result match { + case Some(values) => + return Some(values) case None => logDebug("Block " + blockId + " not found in memory") } } - // Look for block on disk + // Look for block on disk, potentially storing it back into memory if required: if (level.useDisk) { - // Read it as a byte buffer into memory first, then return it - diskStore.getBytes(blockId) match { - case Some(bytes) => - assert (0 == bytes.position()) - if (level.useMemory) { - if (level.deserialized) { - memoryStore.putBytes(blockId, bytes, level) - } else { - // The memory store will hang onto the ByteBuffer, so give it a copy instead of - // the memory-mapped file buffer we got from the disk store - val copyForMemory = ByteBuffer.allocate(bytes.limit) - copyForMemory.put(bytes) - memoryStore.putBytes(blockId, copyForMemory, level) - } - } - bytes.rewind() - return Some(bytes) + logDebug("Getting block " + blockId + " from disk") + val bytes: ByteBuffer = diskStore.getBytes(blockId) match { + case Some(bytes) => bytes case None => throw new Exception("Block " + blockId + " not found on disk, though it should be") } + assert (0 == bytes.position()) + + if (!level.useMemory) { + // If the block shouldn't be stored in memory, we can just return it: + if (asValues) { + return Some(dataDeserialize(blockId, bytes)) + } else { + return Some(bytes) + } + } else { + // Otherwise, we also have to store something in the memory store: + if (!level.deserialized || !asValues) { + // We'll store the bytes in memory if the block's storage level includes + // "memory serialized", or if it should be cached as objects in memory + // but we only requested its serialized bytes: + val copyForMemory = ByteBuffer.allocate(bytes.limit) + copyForMemory.put(bytes) + memoryStore.putBytes(blockId, copyForMemory, level) + bytes.rewind() + } + if (!asValues) { + return Some(bytes) + } else { + val values = dataDeserialize(blockId, bytes) + if (level.deserialized) { + // Cache the values before returning them: + // TODO: Consider creating a putValues that also takes in a iterator? + val valuesBuffer = new ArrayBuffer[Any] + valuesBuffer ++= values + memoryStore.putValues(blockId, valuesBuffer, level, true).data match { + case Left(values2) => + return Some(values2) + case _ => + throw new Exception("Memory store did not return back an iterator") + } + } else { + return Some(values) + } + } + } } } } else { logDebug("Block " + blockId + " not registered locally") } - return None + None } /** * Get block from remote block managers. */ def getRemote(blockId: BlockId): Option[Iterator[Any]] = { - if (blockId == null) { - throw new IllegalArgumentException("Block Id is null") - } logDebug("Getting remote block " + blockId) - // Get locations of block - val locations = Random.shuffle(master.getLocations(blockId)) + doGetRemote(blockId, asValues = true).asInstanceOf[Option[Iterator[Any]]] + } + + /** + * Get block from remote block managers as serialized bytes. + */ + def getRemoteBytes(blockId: BlockId): Option[ByteBuffer] = { + logDebug("Getting remote block " + blockId + " as bytes") + doGetRemote(blockId, asValues = false).asInstanceOf[Option[ByteBuffer]] + } - // Get block from remote locations + private def doGetRemote(blockId: BlockId, asValues: Boolean): Option[Any] = { + require(blockId != null, "BlockId is null") + val locations = Random.shuffle(master.getLocations(blockId)) for (loc <- locations) { logDebug("Getting remote block " + blockId + " from " + loc) val data = BlockManagerWorker.syncGetBlock( GetBlock(blockId), ConnectionManagerId(loc.host, loc.port)) if (data != null) { - return Some(dataDeserialize(blockId, data)) + if (asValues) { + return Some(dataDeserialize(blockId, data)) + } else { + return Some(data) + } } logDebug("The value of block " + blockId + " is null") } @@ -497,31 +467,6 @@ private[spark] class BlockManager( } /** - * Get block from remote block managers as serialized bytes. - */ - def getRemoteBytes(blockId: BlockId): Option[ByteBuffer] = { - // TODO: As with getLocalBytes, this is very similar to getRemote and perhaps should be - // refactored. - if (blockId == null) { - throw new IllegalArgumentException("Block Id is null") - } - logDebug("Getting remote block " + blockId + " as bytes") - - val locations = master.getLocations(blockId) - for (loc <- locations) { - logDebug("Getting remote block " + blockId + " from " + loc) - val data = BlockManagerWorker.syncGetBlock( - GetBlock(blockId), ConnectionManagerId(loc.host, loc.port)) - if (data != null) { - return Some(data) - } - logDebug("The value of block " + blockId + " is null") - } - logDebug("Block " + blockId + " not found") - return None - } - - /** * Get a block from the block manager (either local or remote). */ def get(blockId: BlockId): Option[Iterator[Any]] = { @@ -590,17 +535,24 @@ private[spark] class BlockManager( * Put a new block of values to the block manager. Returns its (estimated) size in bytes. */ def put(blockId: BlockId, values: ArrayBuffer[Any], level: StorageLevel, - tellMaster: Boolean = true) : Long = { + tellMaster: Boolean = true) : Long = { + require(values != null, "Values is null") + doPut(blockId, Left(values), level, tellMaster) + } - if (blockId == null) { - throw new IllegalArgumentException("Block Id is null") - } - if (values == null) { - throw new IllegalArgumentException("Values is null") - } - if (level == null || !level.isValid) { - throw new IllegalArgumentException("Storage level is null or invalid") - } + /** + * Put a new block of serialized bytes to the block manager. + */ + def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel, + tellMaster: Boolean = true) { + require(bytes != null, "Bytes is null") + doPut(blockId, Right(bytes), level, tellMaster) + } + + private def doPut(blockId: BlockId, data: Either[ArrayBuffer[Any], ByteBuffer], + level: StorageLevel, tellMaster: Boolean = true): Long = { + require(blockId != null, "BlockId is null") + require(level != null && level.isValid, "StorageLevel is null or invalid") // Remember the block's storage level so that we can correctly drop it to disk if it needs // to be dropped right after it got put into memory. Note, however, that other threads will @@ -616,7 +568,8 @@ private[spark] class BlockManager( return oldBlockOpt.get.size } - // TODO: So the block info exists - but previous attempt to load it (?) failed. What do we do now ? Retry on it ? + // TODO: So the block info exists - but previous attempt to load it (?) failed. + // What do we do now ? Retry on it ? oldBlockOpt.get } else { tinfo @@ -625,10 +578,10 @@ private[spark] class BlockManager( val startTimeMs = System.currentTimeMillis - // If we need to replicate the data, we'll want access to the values, but because our - // put will read the whole iterator, there will be no values left. For the case where - // the put serializes data, we'll remember the bytes, above; but for the case where it - // doesn't, such as deserialized storage, let's rely on the put returning an Iterator. + // If we're storing values and we need to replicate the data, we'll want access to the values, + // but because our put will read the whole iterator, there will be no values left. For the + // case where the put serializes data, we'll remember the bytes, above; but for the case where + // it doesn't, such as deserialized storage, let's rely on the put returning an Iterator. var valuesAfterPut: Iterator[Any] = null // Ditto for the bytes after the put @@ -637,30 +590,51 @@ private[spark] class BlockManager( // Size of the block in bytes (to return to caller) var size = 0L + // If we're storing bytes, then initiate the replication before storing them locally. + // This is faster as data is already serialized and ready to send. + val replicationFuture = if (data.isRight && level.replication > 1) { + val bufferView = data.right.get.duplicate() // Doesn't copy the bytes, just creates a wrapper + Future { + replicate(blockId, bufferView, level) + } + } else { + null + } + myInfo.synchronized { logTrace("Put for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs) + " to get into synchronized block") var marked = false try { - if (level.useMemory) { - // Save it just to memory first, even if it also has useDisk set to true; we will later - // drop it to disk if the memory store can't hold it. - val res = memoryStore.putValues(blockId, values, level, true) - size = res.size - res.data match { - case Right(newBytes) => bytesAfterPut = newBytes - case Left(newIterator) => valuesAfterPut = newIterator + data match { + case Left(values) => { + if (level.useMemory) { + // Save it just to memory first, even if it also has useDisk set to true; we will + // drop it to disk later if the memory store can't hold it. + val res = memoryStore.putValues(blockId, values, level, true) + size = res.size + res.data match { + case Right(newBytes) => bytesAfterPut = newBytes + case Left(newIterator) => valuesAfterPut = newIterator + } + } else { + // Save directly to disk. + // Don't get back the bytes unless we replicate them. + val askForBytes = level.replication > 1 + val res = diskStore.putValues(blockId, values, level, askForBytes) + size = res.size + res.data match { + case Right(newBytes) => bytesAfterPut = newBytes + case _ => + } + } } - } else { - // Save directly to disk. - // Don't get back the bytes unless we replicate them. - val askForBytes = level.replication > 1 - val res = diskStore.putValues(blockId, values, level, askForBytes) - size = res.size - res.data match { - case Right(newBytes) => bytesAfterPut = newBytes - case _ => + case Right(bytes) => { + bytes.rewind() + // Store it only in memory at first, even if useDisk is also set to true + (if (level.useMemory) memoryStore else diskStore).putBytes(blockId, bytes, level) + size = bytes.limit } } @@ -685,125 +659,39 @@ private[spark] class BlockManager( } logDebug("Put block " + blockId + " locally took " + Utils.getUsedTimeMs(startTimeMs)) - // Replicate block if required + // Either we're storing bytes and we asynchronously started replication, or we're storing + // values and need to serialize and replicate them now: if (level.replication > 1) { - val remoteStartTime = System.currentTimeMillis - // Serialize the block if not already done - if (bytesAfterPut == null) { - if (valuesAfterPut == null) { - throw new SparkException( - "Underlying put returned neither an Iterator nor bytes! This shouldn't happen.") - } - bytesAfterPut = dataSerialize(blockId, valuesAfterPut) - } - replicate(blockId, bytesAfterPut, level) - logDebug("Put block " + blockId + " remotely took " + Utils.getUsedTimeMs(remoteStartTime)) - } - BlockManager.dispose(bytesAfterPut) - - return size - } - - - /** - * Put a new block of serialized bytes to the block manager. - */ - def putBytes( - blockId: BlockId, bytes: ByteBuffer, level: StorageLevel, tellMaster: Boolean = true) { - - if (blockId == null) { - throw new IllegalArgumentException("Block Id is null") - } - if (bytes == null) { - throw new IllegalArgumentException("Bytes is null") - } - if (level == null || !level.isValid) { - throw new IllegalArgumentException("Storage level is null or invalid") - } - - // Remember the block's storage level so that we can correctly drop it to disk if it needs - // to be dropped right after it got put into memory. Note, however, that other threads will - // not be able to get() this block until we call markReady on its BlockInfo. - val myInfo = { - val tinfo = new BlockInfo(level, tellMaster) - // Do atomically ! - val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo) - - if (oldBlockOpt.isDefined) { - if (oldBlockOpt.get.waitForReady()) { - logWarning("Block " + blockId + " already exists on this machine; not re-adding it") - return - } - - // TODO: So the block info exists - but previous attempt to load it (?) failed. What do we do now ? Retry on it ? - oldBlockOpt.get - } else { - tinfo - } - } - - val startTimeMs = System.currentTimeMillis - - // Initiate the replication before storing it locally. This is faster as - // data is already serialized and ready for sending - val replicationFuture = if (level.replication > 1) { - val bufferView = bytes.duplicate() // Doesn't copy the bytes, just creates a wrapper - Future { - replicate(blockId, bufferView, level) - } - } else { - null - } - - myInfo.synchronized { - logDebug("PutBytes for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs) - + " to get into synchronized block") - - var marked = false - try { - if (level.useMemory) { - // Store it only in memory at first, even if useDisk is also set to true - bytes.rewind() - memoryStore.putBytes(blockId, bytes, level) - } else { - bytes.rewind() - diskStore.putBytes(blockId, bytes, level) - } - - // assert (0 == bytes.position(), "" + bytes) - - // Now that the block is in either the memory or disk store, let other threads read it, - // and tell the master about it. - marked = true - myInfo.markReady(bytes.limit) - if (tellMaster) { - reportBlockStatus(blockId, myInfo) - } - } finally { - // If we failed at putting the block to memory/disk, notify other possible readers - // that it has failed, and then remove it from the block info map. - if (! marked) { - // Note that the remove must happen before markFailure otherwise another thread - // could've inserted a new BlockInfo before we remove it. - blockInfo.remove(blockId) - myInfo.markFailure() - logWarning("Putting block " + blockId + " failed") + data match { + case Right(bytes) => Await.ready(replicationFuture, Duration.Inf) + case Left(values) => { + val remoteStartTime = System.currentTimeMillis + // Serialize the block if not already done + if (bytesAfterPut == null) { + if (valuesAfterPut == null) { + throw new SparkException( + "Underlying put returned neither an Iterator nor bytes! This shouldn't happen.") + } + bytesAfterPut = dataSerialize(blockId, valuesAfterPut) + } + replicate(blockId, bytesAfterPut, level) + logDebug("Put block " + blockId + " remotely took " + + Utils.getUsedTimeMs(remoteStartTime)) } } } - // If replication had started, then wait for it to finish - if (level.replication > 1) { - Await.ready(replicationFuture, Duration.Inf) - } + BlockManager.dispose(bytesAfterPut) if (level.replication > 1) { - logDebug("PutBytes for block " + blockId + " with replication took " + + logDebug("Put for block " + blockId + " with replication took " + Utils.getUsedTimeMs(startTimeMs)) } else { - logDebug("PutBytes for block " + blockId + " without replication took " + + logDebug("Put for block " + blockId + " without replication took " + Utils.getUsedTimeMs(startTimeMs)) } + + size } /** @@ -928,34 +816,20 @@ private[spark] class BlockManager( private def dropOldNonBroadcastBlocks(cleanupTime: Long) { logInfo("Dropping non broadcast blocks older than " + cleanupTime) - val iterator = blockInfo.internalMap.entrySet().iterator() - while (iterator.hasNext) { - val entry = iterator.next() - val (id, info, time) = (entry.getKey, entry.getValue._1, entry.getValue._2) - if (time < cleanupTime && !id.isBroadcast) { - info.synchronized { - val level = info.level - if (level.useMemory) { - memoryStore.remove(id) - } - if (level.useDisk) { - diskStore.remove(id) - } - iterator.remove() - logInfo("Dropped block " + id) - } - reportBlockStatus(id, info) - } - } + dropOldBlocks(cleanupTime, !_.isBroadcast) } private def dropOldBroadcastBlocks(cleanupTime: Long) { logInfo("Dropping broadcast blocks older than " + cleanupTime) + dropOldBlocks(cleanupTime, _.isBroadcast) + } + + private def dropOldBlocks(cleanupTime: Long, shouldDrop: (BlockId => Boolean)) { val iterator = blockInfo.internalMap.entrySet().iterator() while (iterator.hasNext) { val entry = iterator.next() val (id, info, time) = (entry.getKey, entry.getValue._1, entry.getValue._2) - if (time < cleanupTime && id.isBroadcast) { + if (time < cleanupTime && shouldDrop(id)) { info.synchronized { val level = info.level if (level.useMemory) { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala index 951503019f..3a65e55733 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala @@ -26,6 +26,7 @@ import org.apache.spark.storage.BlockManagerMessages._ * An actor to take commands from the master to execute options. For example, * this is used to remove blocks from the slave's BlockManager. */ +private[storage] class BlockManagerSlaveActor(blockManager: BlockManager) extends Actor { override def receive = { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala index 6e4382d71e..32d2dd0694 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala @@ -32,7 +32,7 @@ import org.apache.spark.serializer.{SerializationStream, Serializer} * * This interface does not support concurrent writes. */ -private[spark] abstract class BlockObjectWriter(val blockId: BlockId) { +abstract class BlockObjectWriter(val blockId: BlockId) { var closeEventHandler: () => Unit = _ @@ -69,33 +69,60 @@ private[spark] abstract class BlockObjectWriter(val blockId: BlockId) { * Returns the file segment of committed data that this Writer has written. */ def fileSegment(): FileSegment + + /** + * Cumulative time spent performing blocking writes, in ns. + */ + def timeWriting(): Long } /** BlockObjectWriter which writes directly to a file on disk. Appends to the given file. */ -private[spark] class DiskBlockObjectWriter( - blockId: BlockId, - file: File, - serializer: Serializer, - bufferSize: Int, - compressStream: OutputStream => OutputStream) +class DiskBlockObjectWriter( + blockId: BlockId, + file: File, + serializer: Serializer, + bufferSize: Int, + compressStream: OutputStream => OutputStream) extends BlockObjectWriter(blockId) with Logging { + /** Intercepts write calls and tracks total time spent writing. Not thread safe. */ + private class TimeTrackingOutputStream(out: OutputStream) extends OutputStream { + def timeWriting = _timeWriting + private var _timeWriting = 0L + + private def callWithTiming(f: => Unit) = { + val start = System.nanoTime() + f + _timeWriting += (System.nanoTime() - start) + } + + def write(i: Int): Unit = callWithTiming(out.write(i)) + override def write(b: Array[Byte]) = callWithTiming(out.write(b)) + override def write(b: Array[Byte], off: Int, len: Int) = callWithTiming(out.write(b, off, len)) + } + + private val syncWrites = System.getProperty("spark.shuffle.sync", "false").toBoolean + /** The file channel, used for repositioning / truncating the file. */ private var channel: FileChannel = null private var bs: OutputStream = null + private var fos: FileOutputStream = null + private var ts: TimeTrackingOutputStream = null private var objOut: SerializationStream = null private var initialPosition = 0L private var lastValidPosition = 0L private var initialized = false + private var _timeWriting = 0L override def open(): BlockObjectWriter = { - val fos = new FileOutputStream(file, true) + fos = new FileOutputStream(file, true) + ts = new TimeTrackingOutputStream(fos) channel = fos.getChannel() initialPosition = channel.position lastValidPosition = initialPosition - bs = compressStream(new FastBufferedOutputStream(fos, bufferSize)) + bs = compressStream(new FastBufferedOutputStream(ts, bufferSize)) objOut = serializer.newInstance().serializeStream(bs) initialized = true this @@ -103,11 +130,24 @@ private[spark] class DiskBlockObjectWriter( override def close() { if (initialized) { + if (syncWrites) { + // Force outstanding writes to disk and track how long it takes + objOut.flush() + val start = System.nanoTime() + fos.getFD.sync() + _timeWriting += System.nanoTime() - start + } objOut.close() + + _timeWriting += ts.timeWriting + channel = null bs = null + fos = null + ts = null objOut = null } + // Invoke the close callback handler. super.close() } @@ -148,4 +188,7 @@ private[spark] class DiskBlockObjectWriter( val bytesWritten = lastValidPosition - initialPosition new FileSegment(file, initialPosition, bytesWritten) } + + // Only valid if called after close() + override def timeWriting() = _timeWriting } diff --git a/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala b/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala new file mode 100644 index 0000000000..1b074e5ec7 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala @@ -0,0 +1,84 @@ +package org.apache.spark.storage + +import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.{CountDownLatch, Executors} + +import org.apache.spark.serializer.KryoSerializer +import org.apache.spark.SparkContext +import org.apache.spark.util.Utils + +/** Utility for micro-benchmarking shuffle write performance. + * + * Writes simulated shuffle output from several threads and records the observed throughput*/ +object StoragePerfTester { + def main(args: Array[String]) = { + /** Total amount of data to generate. Distributed evenly amongst maps and reduce splits. */ + val dataSizeMb = Utils.memoryStringToMb(sys.env.getOrElse("OUTPUT_DATA", "1g")) + + /** Number of map tasks. All tasks execute concurrently. */ + val numMaps = sys.env.get("NUM_MAPS").map(_.toInt).getOrElse(8) + + /** Number of reduce splits for each map task. */ + val numOutputSplits = sys.env.get("NUM_REDUCERS").map(_.toInt).getOrElse(500) + + val recordLength = 1000 // ~1KB records + val totalRecords = dataSizeMb * 1000 + val recordsPerMap = totalRecords / numMaps + + val writeData = "1" * recordLength + val executor = Executors.newFixedThreadPool(numMaps) + + System.setProperty("spark.shuffle.compress", "false") + System.setProperty("spark.shuffle.sync", "true") + + // This is only used to instantiate a BlockManager. All thread scheduling is done manually. + val sc = new SparkContext("local[4]", "Write Tester") + val blockManager = sc.env.blockManager + + def writeOutputBytes(mapId: Int, total: AtomicLong) = { + val shuffle = blockManager.shuffleBlockManager.forShuffle(1, numOutputSplits, + new KryoSerializer()) + val buckets = shuffle.acquireWriters(mapId) + for (i <- 1 to recordsPerMap) { + buckets.writers(i % numOutputSplits).write(writeData) + } + buckets.writers.map {w => + w.commit() + total.addAndGet(w.fileSegment().length) + w.close() + } + + shuffle.releaseWriters(buckets) + } + + val start = System.currentTimeMillis() + val latch = new CountDownLatch(numMaps) + val totalBytes = new AtomicLong() + for (task <- 1 to numMaps) { + executor.submit(new Runnable() { + override def run() = { + try { + writeOutputBytes(task, totalBytes) + latch.countDown() + } catch { + case e: Exception => + println("Exception in child thread: " + e + " " + e.getMessage) + System.exit(1) + } + } + }) + } + latch.await() + val end = System.currentTimeMillis() + val time = (end - start) / 1000.0 + val bytesPerSecond = totalBytes.get() / time + val bytesPerFile = (totalBytes.get() / (numOutputSplits * numMaps.toDouble)).toLong + + System.err.println("files_total\t\t%s".format(numMaps * numOutputSplits)) + System.err.println("bytes_per_file\t\t%s".format(Utils.bytesToString(bytesPerFile))) + System.err.println("agg_throughput\t\t%s/s".format(Utils.bytesToString(bytesPerSecond.toLong))) + + executor.shutdown() + sc.stop() + } +} diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 163a3746ea..b7c81d091c 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -86,7 +86,7 @@ private[spark] class StagePage(parent: JobProgressUI) { Seq("Task ID", "Status", "Locality Level", "Executor", "Launch Time", "Duration") ++ Seq("GC Time") ++ {if (hasShuffleRead) Seq("Shuffle Read") else Nil} ++ - {if (hasShuffleWrite) Seq("Shuffle Write") else Nil} ++ + {if (hasShuffleWrite) Seq("Write Time", "Shuffle Write") else Nil} ++ Seq("Errors") val taskTable = listingTable(taskHeaders, taskRow(hasShuffleRead, hasShuffleWrite), tasks) @@ -169,6 +169,8 @@ private[spark] class StagePage(parent: JobProgressUI) { Utils.bytesToString(s.remoteBytesRead)}.getOrElse("")}</td> }} {if (shuffleWrite) { + <td>{metrics.flatMap{m => m.shuffleWriteMetrics}.map{s => + parent.formatDuration(s.shuffleWriteTime / (1000 * 1000))}.getOrElse("")}</td> <td>{metrics.flatMap{m => m.shuffleWriteMetrics}.map{s => Utils.bytesToString(s.shuffleBytesWritten)}.getOrElse("")}</td> }} |