aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-12-24 11:38:17 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-12-24 11:38:17 -0800
commit9f79fd89dc84cda7ebeb98a0b43c8e982fefa787 (patch)
treea83e0237d6f3a8745924b05332e25d5772aa7d60 /core
parent0af7f84c8eb631cd2e427b692f407ec2d37dad64 (diff)
parentd63856c361cf47b1a508397ee9de38a7b5899fa0 (diff)
downloadspark-9f79fd89dc84cda7ebeb98a0b43c8e982fefa787.tar.gz
spark-9f79fd89dc84cda7ebeb98a0b43c8e982fefa787.tar.bz2
spark-9f79fd89dc84cda7ebeb98a0b43c8e982fefa787.zip
Merge branch 'apache-master' into filestream-fix
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/MapOutputTracker.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/Client.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala27
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala20
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala31
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala27
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala90
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala34
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala17
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/util/AkkaUtils.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/JavaAPISuite.java33
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala73
30 files changed, 395 insertions, 113 deletions
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 10fae5af9f..ccffcc356c 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -29,8 +29,7 @@ import akka.pattern.ask
import org.apache.spark.scheduler.MapStatus
import org.apache.spark.storage.BlockManagerId
-import org.apache.spark.util.{MetadataCleanerType, Utils, MetadataCleaner, TimeStampedHashMap}
-
+import org.apache.spark.util.{AkkaUtils, MetadataCleaner, MetadataCleanerType, TimeStampedHashMap, Utils}
private[spark] sealed trait MapOutputTrackerMessage
private[spark] case class GetMapOutputStatuses(shuffleId: Int, requester: String)
@@ -53,7 +52,7 @@ private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster
private[spark] class MapOutputTracker extends Logging {
- private val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
+ private val timeout = AkkaUtils.askTimeout
// Set to the MapOutputTrackerActor living on the driver
var trackerActor: Either[ActorRef, ActorSelection] = _
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
index c47657f512..037cd1c774 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
@@ -125,6 +125,8 @@ JavaRDDLike[T, JavaRDD[T]] {
*/
def subtract(other: JavaRDD[T], p: Partitioner): JavaRDD[T] =
wrapRDD(rdd.subtract(other, p))
+
+ override def toString = rdd.toString
}
object JavaRDD {
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 9e912d3adb..f344804b4c 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -245,6 +245,17 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
}
/**
+ * Return an array that contains all of the elements in a specific partition of this RDD.
+ */
+ def collectPartitions(partitionIds: Array[Int]): Array[JList[T]] = {
+ // This is useful for implementing `take` from other language frontends
+ // like Python where the data is serialized.
+ import scala.collection.JavaConversions._
+ val res = context.runJob(rdd, (it: Iterator[T]) => it.toArray, partitionIds, true)
+ res.map(x => new java.util.ArrayList(x.toSeq)).toArray
+ }
+
+ /**
* Reduces the elements of this RDD using the specified commutative and associative binary operator.
*/
def reduce(f: JFunction2[T, T, T]): T = rdd.reduce(f)
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index a659cc06c2..ca42c76928 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -235,10 +235,6 @@ private[spark] object PythonRDD {
file.close()
}
- def takePartition[T](rdd: RDD[T], partition: Int): Iterator[T] = {
- implicit val cm : ClassTag[T] = rdd.elementClassTag
- rdd.context.runJob(rdd, ((x: Iterator[T]) => x.toArray), Seq(partition), true).head.iterator
- }
}
private class BytesToString extends org.apache.spark.api.java.function.Function[Array[Byte], String] {
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
index 4d95efa73a..953755e40d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
@@ -23,14 +23,14 @@ import scala.concurrent.duration._
import scala.concurrent.Await
import akka.actor._
-import akka.pattern.AskTimeoutException
import akka.pattern.ask
-import akka.remote.{RemotingLifecycleEvent, DisassociatedEvent, AssociationErrorEvent}
+import akka.remote.{RemotingLifecycleEvent, DisassociatedEvent}
import org.apache.spark.{SparkException, Logging}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.Master
+import org.apache.spark.util.AkkaUtils
/**
@@ -178,7 +178,7 @@ private[spark] class Client(
def stop() {
if (actor != null) {
try {
- val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
+ val timeout = AkkaUtils.askTimeout
val future = actor.ask(StopClient)(timeout)
Await.result(future, timeout)
} catch {
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index c627dd3806..eebd0794b8 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -18,19 +18,16 @@
package org.apache.spark.deploy.master
import java.text.SimpleDateFormat
-import java.util.concurrent.TimeUnit
import java.util.Date
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.concurrent.Await
import scala.concurrent.duration._
-import scala.concurrent.duration.{Duration, FiniteDuration}
import akka.actor._
import akka.pattern.ask
-import akka.remote._
+import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
import akka.serialization.SerializationExtension
-import akka.util.Timeout
import org.apache.spark.{Logging, SparkException}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
@@ -38,7 +35,7 @@ import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.MasterMessages._
import org.apache.spark.deploy.master.ui.MasterWebUI
import org.apache.spark.metrics.MetricsSystem
-import org.apache.spark.util.{Utils, AkkaUtils}
+import org.apache.spark.util.{AkkaUtils, Utils}
private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging {
import context.dispatcher
@@ -64,8 +61,6 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
val waitingApps = new ArrayBuffer[ApplicationInfo]
val completedApps = new ArrayBuffer[ApplicationInfo]
- var firstApp: Option[ApplicationInfo] = None
-
Utils.checkHost(host, "Expected hostname")
val masterMetricsSystem = MetricsSystem.createMetricsSystem("master")
@@ -444,14 +439,6 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
idToApp(app.id) = app
actorToApp(app.driver) = app
addressToApp(appAddress) = app
- if (firstApp == None) {
- firstApp = Some(app)
- }
- // TODO: What is firstApp?? Can we remove it?
- val workersAlive = workers.filter(_.state == WorkerState.ALIVE).toArray
- if (workersAlive.size > 0 && !workersAlive.exists(_.memoryFree >= app.desc.memoryPerSlave)) {
- logWarning("Could not find any workers with enough memory for " + firstApp.get.id)
- }
waitingApps += app
}
@@ -537,12 +524,10 @@ private[spark] object Master {
def startSystemAndActor(host: String, port: Int, webUiPort: Int): (ActorSystem, Int, Int) = {
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port)
- val actor = actorSystem.actorOf(Props(classOf[Master], host, boundPort, webUiPort), name = actorName)
- val timeoutDuration: FiniteDuration = Duration.create(
- System.getProperty("spark.akka.askTimeout", "10").toLong, TimeUnit.SECONDS)
- implicit val timeout = Timeout(timeoutDuration)
- val respFuture = actor ? RequestWebUIPort // ask pattern
- val resp = Await.result(respFuture, timeoutDuration).asInstanceOf[WebUIPortResponse]
+ val actor = actorSystem.actorOf(Props(classOf[Master], host, boundPort, webUiPort), actorName)
+ val timeout = AkkaUtils.askTimeout
+ val respFuture = actor.ask(RequestWebUIPort)(timeout)
+ val resp = Await.result(respFuture, timeout).asInstanceOf[WebUIPortResponse]
(actorSystem, boundPort, resp.webUIBoundPort)
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala b/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala
index 81e15c534f..6cc7fd2ff4 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala
@@ -18,12 +18,12 @@
package org.apache.spark.deploy.master
import scala.collection.JavaConversions._
-import scala.concurrent.ops._
-import org.apache.spark.Logging
import org.apache.zookeeper._
-import org.apache.zookeeper.data.Stat
import org.apache.zookeeper.Watcher.Event.KeeperState
+import org.apache.zookeeper.data.Stat
+
+import org.apache.spark.Logging
/**
* Provides a Scala-side interface to the standard ZooKeeper client, with the addition of retry
@@ -33,7 +33,7 @@ import org.apache.zookeeper.Watcher.Event.KeeperState
* informed via zkDown().
*
* Additionally, all commands sent to ZooKeeper will be retried until they either fail too many
- * times or a semantic exception is thrown (e.g.., "node already exists").
+ * times or a semantic exception is thrown (e.g., "node already exists").
*/
private[spark] class SparkZooKeeperSession(zkWatcher: SparkZooKeeperWatcher) extends Logging {
val ZK_URL = System.getProperty("spark.deploy.zookeeper.url", "")
@@ -103,6 +103,7 @@ private[spark] class SparkZooKeeperSession(zkWatcher: SparkZooKeeperWatcher) ext
connectToZooKeeper()
case KeeperState.Disconnected =>
logWarning("ZooKeeper disconnected, will retry...")
+ case s => // Do nothing
}
}
}
@@ -179,7 +180,7 @@ private[spark] class SparkZooKeeperSession(zkWatcher: SparkZooKeeperWatcher) ext
} catch {
case e: KeeperException.NoNodeException => throw e
case e: KeeperException.NodeExistsException => throw e
- case e if n > 0 =>
+ case e: Exception if n > 0 =>
logError("ZooKeeper exception, " + n + " more retries...", e)
Thread.sleep(RETRY_WAIT_MILLIS)
retry(fn, n-1)
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
index 7809013e83..7d535b08de 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
@@ -21,8 +21,8 @@ import akka.actor.ActorRef
import org.apache.zookeeper._
import org.apache.zookeeper.Watcher.Event.EventType
-import org.apache.spark.deploy.master.MasterMessages._
import org.apache.spark.Logging
+import org.apache.spark.deploy.master.MasterMessages._
private[spark] class ZooKeeperLeaderElectionAgent(val masterActor: ActorRef, masterUrl: String)
extends LeaderElectionAgent with SparkZooKeeperWatcher with Logging {
@@ -105,7 +105,7 @@ private[spark] class ZooKeeperLeaderElectionAgent(val masterActor: ActorRef, mas
// We found a different master file pointing to this process.
// This can happen in the following two cases:
// (1) The master process was restarted on the same node.
- // (2) The ZK server died between creating the node and returning the name of the node.
+ // (2) The ZK server died between creating the file and returning the name of the file.
// For this case, we will end up creating a second file, and MUST explicitly delete the
// first one, since our ZK session is still open.
// Note that this deletion will cause a NodeDeleted event to be fired so we check again for
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
index 3b983c19eb..dbb0cb90f5 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
@@ -17,32 +17,28 @@
package org.apache.spark.deploy.master.ui
+import scala.concurrent.Await
import scala.xml.Node
import akka.pattern.ask
-
-import scala.concurrent.Await
-import scala.concurrent.duration._
-
import javax.servlet.http.HttpServletRequest
-
import net.liftweb.json.JsonAST.JValue
-import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
import org.apache.spark.deploy.JsonProtocol
+import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
import org.apache.spark.deploy.master.ExecutorInfo
import org.apache.spark.ui.UIUtils
import org.apache.spark.util.Utils
private[spark] class ApplicationPage(parent: MasterWebUI) {
val master = parent.masterActorRef
- implicit val timeout = parent.timeout
+ val timeout = parent.timeout
/** Executor details for a particular application */
def renderJson(request: HttpServletRequest): JValue = {
val appId = request.getParameter("appId")
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
- val state = Await.result(stateFuture, 30 seconds)
+ val state = Await.result(stateFuture, timeout)
val app = state.activeApps.find(_.id == appId).getOrElse({
state.completedApps.find(_.id == appId).getOrElse(null)
})
@@ -53,7 +49,7 @@ private[spark] class ApplicationPage(parent: MasterWebUI) {
def render(request: HttpServletRequest): Seq[Node] = {
val appId = request.getParameter("appId")
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
- val state = Await.result(stateFuture, 30 seconds)
+ val state = Await.result(stateFuture, timeout)
val app = state.activeApps.find(_.id == appId).getOrElse({
state.completedApps.find(_.id == appId).getOrElse(null)
})
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
index 65e7a14e7a..4ef762892c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
@@ -17,37 +17,33 @@
package org.apache.spark.deploy.master.ui
-import javax.servlet.http.HttpServletRequest
-
+import scala.concurrent.Await
import scala.xml.Node
-import scala.concurrent.Await
import akka.pattern.ask
-import scala.concurrent.duration._
-
+import javax.servlet.http.HttpServletRequest
import net.liftweb.json.JsonAST.JValue
-import org.apache.spark.deploy.DeployWebUI
+import org.apache.spark.deploy.{DeployWebUI, JsonProtocol}
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
-import org.apache.spark.deploy.JsonProtocol
import org.apache.spark.deploy.master.{ApplicationInfo, WorkerInfo}
import org.apache.spark.ui.UIUtils
import org.apache.spark.util.Utils
private[spark] class IndexPage(parent: MasterWebUI) {
val master = parent.masterActorRef
- implicit val timeout = parent.timeout
+ val timeout = parent.timeout
def renderJson(request: HttpServletRequest): JValue = {
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
- val state = Await.result(stateFuture, 30 seconds)
+ val state = Await.result(stateFuture, timeout)
JsonProtocol.writeMasterState(state)
}
/** Index view listing applications and executors */
def render(request: HttpServletRequest): Seq[Node] = {
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
- val state = Await.result(stateFuture, 30 seconds)
+ val state = Await.result(stateFuture, timeout)
val workerHeaders = Seq("Id", "Address", "State", "Cores", "Memory")
val workers = state.workers.sortBy(_.id)
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
index a211ce2b42..9ab594b682 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
@@ -17,25 +17,21 @@
package org.apache.spark.deploy.master.ui
-import scala.concurrent.duration._
-
import javax.servlet.http.HttpServletRequest
-
import org.eclipse.jetty.server.{Handler, Server}
-import org.apache.spark.{Logging}
+import org.apache.spark.Logging
import org.apache.spark.deploy.master.Master
import org.apache.spark.ui.JettyUtils
import org.apache.spark.ui.JettyUtils._
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{AkkaUtils, Utils}
/**
* Web UI server for the standalone master.
*/
private[spark]
class MasterWebUI(val master: Master, requestedPort: Int) extends Logging {
- implicit val timeout = Duration.create(
- System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
+ val timeout = AkkaUtils.askTimeout
val host = Utils.localHostName()
val port = requestedPort
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
index 1a768d501f..0d59048313 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
@@ -42,13 +42,13 @@ private[spark] class IndexPage(parent: WorkerWebUI) {
def renderJson(request: HttpServletRequest): JValue = {
val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerStateResponse]
- val workerState = Await.result(stateFuture, 30 seconds)
+ val workerState = Await.result(stateFuture, timeout)
JsonProtocol.writeWorkerState(workerState)
}
def render(request: HttpServletRequest): Seq[Node] = {
val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerStateResponse]
- val workerState = Await.result(stateFuture, 30 seconds)
+ val workerState = Await.result(stateFuture, timeout)
val executorHeaders = Seq("ExecutorID", "Cores", "Memory", "Job Details", "Logs")
val runningExecutorTable =
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
index 6c18a3c245..40d6bdb3fd 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
@@ -19,17 +19,14 @@ package org.apache.spark.deploy.worker.ui
import java.io.File
-import scala.concurrent.duration._
-
-import akka.util.Timeout
import javax.servlet.http.HttpServletRequest
+import org.eclipse.jetty.server.{Handler, Server}
import org.apache.spark.Logging
import org.apache.spark.deploy.worker.Worker
import org.apache.spark.ui.{JettyUtils, UIUtils}
import org.apache.spark.ui.JettyUtils._
-import org.apache.spark.util.Utils
-import org.eclipse.jetty.server.{Handler, Server}
+import org.apache.spark.util.{AkkaUtils, Utils}
/**
* Web UI server for the standalone worker.
@@ -37,8 +34,7 @@ import org.eclipse.jetty.server.{Handler, Server}
private[spark]
class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[Int] = None)
extends Logging {
- implicit val timeout = Timeout(
- Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds"))
+ val timeout = AkkaUtils.askTimeout
val host = Utils.localHostName()
val port = requestedPort.getOrElse(
System.getProperty("worker.ui.port", WorkerWebUI.DEFAULT_PORT).toInt)
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 0b0a60ee60..0f19d7a96b 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -222,18 +222,22 @@ private[spark] class Executor(
return
}
+ val resultSer = SparkEnv.get.serializer.newInstance()
+ val beforeSerialization = System.currentTimeMillis()
+ val valueBytes = resultSer.serialize(value)
+ val afterSerialization = System.currentTimeMillis()
+
for (m <- task.metrics) {
m.hostname = Utils.localHostName()
m.executorDeserializeTime = (taskStart - startTime).toInt
m.executorRunTime = (taskFinish - taskStart).toInt
m.jvmGCTime = gcTime - startGCTime
+ m.resultSerializationTime = (afterSerialization - beforeSerialization).toInt
}
- // TODO I'd also like to track the time it takes to serialize the task results, but that is
- // huge headache, b/c we need to serialize the task metrics first. If TaskMetrics had a
- // custom serialized format, we could just change the relevants bytes in the byte buffer
+
val accumUpdates = Accumulators.values
- val directResult = new DirectTaskResult(value, accumUpdates, task.metrics.getOrElse(null))
+ val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.getOrElse(null))
val serializedDirectResult = ser.serialize(directResult)
logInfo("Serialized size of result for " + taskId + " is " + serializedDirectResult.limit)
val serializedResult = {
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index c0ce46e379..bb1471d9ee 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -44,6 +44,11 @@ class TaskMetrics extends Serializable {
var jvmGCTime: Long = _
/**
+ * Amount of time spent serializing the task result
+ */
+ var resultSerializationTime: Long = _
+
+ /**
* If this task reads from shuffle output, metrics on getting shuffle data will be collected here
*/
var shuffleReadMetrics: Option[ShuffleReadMetrics] = None
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
index 7e468d0d67..e80cc6b0f6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
@@ -35,18 +35,15 @@ case class IndirectTaskResult[T](blockId: BlockId) extends TaskResult[T] with Se
/** A TaskResult that contains the task's return value and accumulator updates. */
private[spark]
-class DirectTaskResult[T](var value: T, var accumUpdates: Map[Long, Any], var metrics: TaskMetrics)
+class DirectTaskResult[T](var valueBytes: ByteBuffer, var accumUpdates: Map[Long, Any], var metrics: TaskMetrics)
extends TaskResult[T] with Externalizable {
- def this() = this(null.asInstanceOf[T], null, null)
+ def this() = this(null.asInstanceOf[ByteBuffer], null, null)
override def writeExternal(out: ObjectOutput) {
- val objectSer = SparkEnv.get.serializer.newInstance()
- val bb = objectSer.serialize(value)
-
- out.writeInt(bb.remaining())
- Utils.writeByteBuffer(bb, out)
+ out.writeInt(valueBytes.remaining);
+ Utils.writeByteBuffer(valueBytes, out)
out.writeInt(accumUpdates.size)
for ((key, value) <- accumUpdates) {
@@ -58,12 +55,10 @@ class DirectTaskResult[T](var value: T, var accumUpdates: Map[Long, Any], var me
override def readExternal(in: ObjectInput) {
- val objectSer = SparkEnv.get.serializer.newInstance()
-
val blen = in.readInt()
val byteVal = new Array[Byte](blen)
in.readFully(byteVal)
- value = objectSer.deserialize(ByteBuffer.wrap(byteVal))
+ valueBytes = ByteBuffer.wrap(byteVal)
val numUpdates = in.readInt
if (numUpdates == 0) {
@@ -76,4 +71,9 @@ class DirectTaskResult[T](var value: T, var accumUpdates: Map[Long, Any], var me
}
metrics = in.readObject().asInstanceOf[TaskMetrics]
}
+
+ def value(): T = {
+ val resultSer = SparkEnv.get.serializer.newInstance()
+ return resultSer.deserialize(valueBytes)
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index f5e8766f6d..7e22c843bf 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -27,10 +27,10 @@ import akka.actor._
import akka.pattern.ask
import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
-import org.apache.spark.{SparkException, Logging, TaskState}
+import org.apache.spark.{Logging, SparkException, TaskState}
import org.apache.spark.scheduler.TaskDescription
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{AkkaUtils, Utils}
/**
* A scheduler backend that waits for coarse grained executors to connect to it through Akka.
@@ -47,6 +47,8 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
// Use an atomic variable to track total number of cores in the cluster for simplicity and speed
var totalCoreCount = new AtomicInteger(0)
+ private val timeout = AkkaUtils.askTimeout
+
class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
private val executorActor = new HashMap[String, ActorRef]
private val executorAddress = new HashMap[String, Address]
@@ -172,10 +174,6 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
Props(new DriverActor(properties)), name = CoarseGrainedSchedulerBackend.ACTOR_NAME)
}
- private val timeout = {
- Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
- }
-
def stopExecutors() {
try {
if (driverActor != null) {
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index e05b842476..e1d68ef592 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -18,7 +18,6 @@
package org.apache.spark.storage
import scala.concurrent.{Await, Future}
-import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import akka.actor._
@@ -26,15 +25,17 @@ import akka.pattern.ask
import org.apache.spark.{Logging, SparkException}
import org.apache.spark.storage.BlockManagerMessages._
+import org.apache.spark.util.AkkaUtils
-private[spark] class BlockManagerMaster(var driverActor : Either[ActorRef, ActorSelection]) extends Logging {
+private[spark]
+class BlockManagerMaster(var driverActor : Either[ActorRef, ActorSelection]) extends Logging {
val AKKA_RETRY_ATTEMPTS: Int = System.getProperty("spark.akka.num.retries", "3").toInt
val AKKA_RETRY_INTERVAL_MS: Int = System.getProperty("spark.akka.retry.wait", "3000").toInt
val DRIVER_AKKA_ACTOR_NAME = "BlockManagerMaster"
- val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
+ val timeout = AkkaUtils.askTimeout
/** Remove a dead executor from the driver actor. This is only called on the driver side. */
def removeExecutor(execId: String) {
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
index 154a3980e9..21022e1cfb 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
@@ -21,17 +21,15 @@ import java.util.{HashMap => JHashMap}
import scala.collection.mutable
import scala.collection.JavaConversions._
+import scala.concurrent.Future
+import scala.concurrent.duration._
import akka.actor.{Actor, ActorRef, Cancellable}
import akka.pattern.ask
-import scala.concurrent.duration._
-import scala.concurrent.Future
-
import org.apache.spark.{Logging, SparkException}
import org.apache.spark.storage.BlockManagerMessages._
-import org.apache.spark.util.Utils
-
+import org.apache.spark.util.{AkkaUtils, Utils}
/**
* BlockManagerMasterActor is an actor on the master node to track statuses of
@@ -50,8 +48,7 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
// Mapping from block id to the set of block managers that have the block.
private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]]
- val akkaTimeout = Duration.create(
- System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
+ private val akkaTimeout = AkkaUtils.askTimeout
initLogging()
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
index e596690bc3..a31a7e1d58 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
@@ -56,7 +56,8 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize)).fold(0L)(_+_)
val execHead = Seq("Executor ID", "Address", "RDD blocks", "Memory used", "Disk used",
- "Active tasks", "Failed tasks", "Complete tasks", "Total tasks")
+ "Active tasks", "Failed tasks", "Complete tasks", "Total tasks", "Task Time", "Shuffle Read",
+ "Shuffle Write")
def execRow(kv: Seq[String]) = {
<tr>
@@ -73,6 +74,9 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
<td>{kv(7)}</td>
<td>{kv(8)}</td>
<td>{kv(9)}</td>
+ <td>{Utils.msDurationToString(kv(10).toLong)}</td>
+ <td>{Utils.bytesToString(kv(11).toLong)}</td>
+ <td>{Utils.bytesToString(kv(12).toLong)}</td>
</tr>
}
@@ -111,6 +115,9 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
val failedTasks = listener.executorToTasksFailed.getOrElse(execId, 0)
val completedTasks = listener.executorToTasksComplete.getOrElse(execId, 0)
val totalTasks = activeTasks + failedTasks + completedTasks
+ val totalDuration = listener.executorToDuration.getOrElse(execId, 0)
+ val totalShuffleRead = listener.executorToShuffleRead.getOrElse(execId, 0)
+ val totalShuffleWrite = listener.executorToShuffleWrite.getOrElse(execId, 0)
Seq(
execId,
@@ -122,7 +129,10 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
activeTasks.toString,
failedTasks.toString,
completedTasks.toString,
- totalTasks.toString
+ totalTasks.toString,
+ totalDuration.toString,
+ totalShuffleRead.toString,
+ totalShuffleWrite.toString
)
}
@@ -130,6 +140,9 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
val executorToTasksActive = HashMap[String, HashSet[TaskInfo]]()
val executorToTasksComplete = HashMap[String, Int]()
val executorToTasksFailed = HashMap[String, Int]()
+ val executorToDuration = HashMap[String, Long]()
+ val executorToShuffleRead = HashMap[String, Long]()
+ val executorToShuffleWrite = HashMap[String, Long]()
override def onTaskStart(taskStart: SparkListenerTaskStart) {
val eid = taskStart.taskInfo.executorId
@@ -140,6 +153,9 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
val eid = taskEnd.taskInfo.executorId
val activeTasks = executorToTasksActive.getOrElseUpdate(eid, new HashSet[TaskInfo]())
+ val newDuration = executorToDuration.getOrElse(eid, 0L) + taskEnd.taskInfo.duration
+ executorToDuration.put(eid, newDuration)
+
activeTasks -= taskEnd.taskInfo
val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
taskEnd.reason match {
@@ -150,6 +166,17 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
executorToTasksComplete(eid) = executorToTasksComplete.getOrElse(eid, 0) + 1
(None, Option(taskEnd.taskMetrics))
}
+
+ // update shuffle read/write
+ if (null != taskEnd.taskMetrics) {
+ taskEnd.taskMetrics.shuffleReadMetrics.foreach(shuffleRead =>
+ executorToShuffleRead.put(eid, executorToShuffleRead.getOrElse(eid, 0L) +
+ shuffleRead.remoteBytesRead))
+
+ taskEnd.taskMetrics.shuffleWriteMetrics.foreach(shuffleWrite =>
+ executorToShuffleWrite.put(eid, executorToShuffleWrite.getOrElse(eid, 0L) +
+ shuffleWrite.shuffleBytesWritten))
+ }
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
new file mode 100644
index 0000000000..3c53e88380
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.jobs
+
+/** class for reporting aggregated metrics for each executors in stageUI */
+private[spark] class ExecutorSummary {
+ var taskTime : Long = 0
+ var failedTasks : Int = 0
+ var succeededTasks : Int = 0
+ var shuffleRead : Long = 0
+ var shuffleWrite : Long = 0
+}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
new file mode 100644
index 0000000000..0dd876480a
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.jobs
+
+import scala.xml.Node
+
+import org.apache.spark.scheduler.SchedulingMode
+import org.apache.spark.util.Utils
+import scala.collection.mutable
+
+/** Page showing executor summary */
+private[spark] class ExecutorTable(val parent: JobProgressUI, val stageId: Int) {
+
+ val listener = parent.listener
+ val dateFmt = parent.dateFmt
+ val isFairScheduler = listener.sc.getSchedulingMode == SchedulingMode.FAIR
+
+ def toNodeSeq(): Seq[Node] = {
+ listener.synchronized {
+ executorTable()
+ }
+ }
+
+ /** Special table which merges two header cells. */
+ private def executorTable[T](): Seq[Node] = {
+ <table class="table table-bordered table-striped table-condensed sortable">
+ <thead>
+ <th>Executor ID</th>
+ <th>Address</th>
+ <th>Task Time</th>
+ <th>Total Tasks</th>
+ <th>Failed Tasks</th>
+ <th>Succeeded Tasks</th>
+ <th>Shuffle Read</th>
+ <th>Shuffle Write</th>
+ </thead>
+ <tbody>
+ {createExecutorTable()}
+ </tbody>
+ </table>
+ }
+
+ private def createExecutorTable() : Seq[Node] = {
+ // make a executor-id -> address map
+ val executorIdToAddress = mutable.HashMap[String, String]()
+ val storageStatusList = parent.sc.getExecutorStorageStatus
+ for (statusId <- 0 until storageStatusList.size) {
+ val blockManagerId = parent.sc.getExecutorStorageStatus(statusId).blockManagerId
+ val address = blockManagerId.hostPort
+ val executorId = blockManagerId.executorId
+ executorIdToAddress.put(executorId, address)
+ }
+
+ val executorIdToSummary = listener.stageIdToExecutorSummaries.get(stageId)
+ executorIdToSummary match {
+ case Some(x) => {
+ x.toSeq.sortBy(_._1).map{
+ case (k,v) => {
+ <tr>
+ <td>{k}</td>
+ <td>{executorIdToAddress.getOrElse(k, "CANNOT FIND ADDRESS")}</td>
+ <td>{parent.formatDuration(v.taskTime)}</td>
+ <td>{v.failedTasks + v.succeededTasks}</td>
+ <td>{v.failedTasks}</td>
+ <td>{v.succeededTasks}</td>
+ <td>{Utils.bytesToString(v.shuffleRead)}</td>
+ <td>{Utils.bytesToString(v.shuffleWrite)}</td>
+ </tr>
+ }
+ }
+ }
+ case _ => { Seq[Node]() }
+ }
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 6b854740d6..07a42f0503 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -57,6 +57,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
val stageIdToTasksFailed = HashMap[Int, Int]()
val stageIdToTaskInfos =
HashMap[Int, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]()
+ val stageIdToExecutorSummaries = HashMap[Int, HashMap[String, ExecutorSummary]]()
override def onJobStart(jobStart: SparkListenerJobStart) {}
@@ -124,8 +125,41 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
val sid = taskEnd.task.stageId
+
+ // create executor summary map if necessary
+ val executorSummaryMap = stageIdToExecutorSummaries.getOrElseUpdate(key = sid,
+ op = new HashMap[String, ExecutorSummary]())
+ executorSummaryMap.getOrElseUpdate(key = taskEnd.taskInfo.executorId,
+ op = new ExecutorSummary())
+
+ val executorSummary = executorSummaryMap.get(taskEnd.taskInfo.executorId)
+ executorSummary match {
+ case Some(y) => {
+ // first update failed-task, succeed-task
+ taskEnd.reason match {
+ case Success =>
+ y.succeededTasks += 1
+ case _ =>
+ y.failedTasks += 1
+ }
+
+ // update duration
+ y.taskTime += taskEnd.taskInfo.duration
+
+ taskEnd.taskMetrics.shuffleReadMetrics.foreach { shuffleRead =>
+ y.shuffleRead += shuffleRead.remoteBytesRead
+ }
+
+ taskEnd.taskMetrics.shuffleWriteMetrics.foreach { shuffleWrite =>
+ y.shuffleWrite += shuffleWrite.shuffleBytesWritten
+ }
+ }
+ case _ => {}
+ }
+
val tasksActive = stageIdToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]())
tasksActive -= taskEnd.taskInfo
+
val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
taskEnd.reason match {
case e: ExceptionFailure =>
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 69f9446bab..8dcfeacb60 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -66,7 +66,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
<div>
<ul class="unstyled">
<li>
- <strong>Total duration across all tasks: </strong>
+ <strong>Total task time across all tasks: </strong>
{parent.formatDuration(listener.stageIdToTime.getOrElse(stageId, 0L) + activeTime)}
</li>
{if (hasShuffleRead)
@@ -86,7 +86,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
val taskHeaders: Seq[String] =
Seq("Task Index", "Task ID", "Status", "Locality Level", "Executor", "Launch Time") ++
- Seq("Duration", "GC Time") ++
+ Seq("Duration", "GC Time", "Result Ser Time") ++
{if (hasShuffleRead) Seq("Shuffle Read") else Nil} ++
{if (hasShuffleWrite) Seq("Write Time", "Shuffle Write") else Nil} ++
Seq("Errors")
@@ -101,6 +101,11 @@ private[spark] class StagePage(parent: JobProgressUI) {
None
}
else {
+ val serializationTimes = validTasks.map{case (info, metrics, exception) =>
+ metrics.get.resultSerializationTime.toDouble}
+ val serializationQuantiles = "Result serialization time" +: Distribution(serializationTimes).get.getQuantiles().map(
+ ms => parent.formatDuration(ms.toLong))
+
val serviceTimes = validTasks.map{case (info, metrics, exception) =>
metrics.get.executorRunTime.toDouble}
val serviceQuantiles = "Duration" +: Distribution(serviceTimes).get.getQuantiles().map(
@@ -149,6 +154,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
val shuffleWriteQuantiles = "Shuffle Write" +: getQuantileCols(shuffleWriteSizes)
val listings: Seq[Seq[String]] = Seq(
+ serializationQuantiles,
serviceQuantiles,
gettingResultQuantiles,
schedulerDelayQuantiles,
@@ -160,11 +166,12 @@ private[spark] class StagePage(parent: JobProgressUI) {
def quantileRow(data: Seq[String]): Seq[Node] = <tr> {data.map(d => <td>{d}</td>)} </tr>
Some(listingTable(quantileHeaders, quantileRow, listings, fixedWidth = true))
}
-
+ val executorTable = new ExecutorTable(parent, stageId)
val content =
summary ++
<h4>Summary Metrics for {numCompleted} Completed Tasks</h4> ++
<div>{summaryTable.getOrElse("No tasks have reported metrics yet.")}</div> ++
+ <h4>Aggregated Metrics by Executors</h4> ++ executorTable.toNodeSeq() ++
<h4>Tasks</h4> ++ taskTable
headerSparkPage(content, parent.sc, "Details for Stage %d".format(stageId), Stages)
@@ -183,6 +190,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
val formatDuration = if (info.status == "RUNNING") parent.formatDuration(duration)
else metrics.map(m => parent.formatDuration(m.executorRunTime)).getOrElse("")
val gcTime = metrics.map(m => m.jvmGCTime).getOrElse(0L)
+ val serializationTime = metrics.map(m => m.resultSerializationTime).getOrElse(0L)
val maybeShuffleRead = metrics.flatMap{m => m.shuffleReadMetrics}.map{s => s.remoteBytesRead}
val shuffleReadSortable = maybeShuffleRead.map(_.toString).getOrElse("")
@@ -210,6 +218,9 @@ private[spark] class StagePage(parent: JobProgressUI) {
<td sorttable_customkey={gcTime.toString}>
{if (gcTime > 0) parent.formatDuration(gcTime) else ""}
</td>
+ <td sorttable_customkey={serializationTime.toString}>
+ {if (serializationTime > 0) parent.formatDuration(serializationTime) else ""}
+ </td>
{if (shuffleRead) {
<td sorttable_customkey={shuffleReadSortable}>
{shuffleReadReadable}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index 9ad6de3c6d..463d85dfd5 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -48,7 +48,7 @@ private[spark] class StageTable(val stages: Seq[StageInfo], val parent: JobProgr
{if (isFairScheduler) {<th>Pool Name</th>} else {}}
<th>Description</th>
<th>Submitted</th>
- <th>Duration</th>
+ <th>Task Time</th>
<th>Tasks: Succeeded/Total</th>
<th>Shuffle Read</th>
<th>Shuffle Write</th>
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala b/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala
index a5446b3fc3..39f422dd6b 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala
@@ -28,9 +28,6 @@ import org.apache.spark.ui.JettyUtils._
/** Web UI showing storage status of all RDD's in the given SparkContext. */
private[spark] class BlockManagerUI(val sc: SparkContext) extends Logging {
- implicit val timeout = Duration.create(
- System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
-
val indexPage = new IndexPage(this)
val rddPage = new RDDPage(this)
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index 74133cef6c..1c8b51b8bc 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -17,6 +17,8 @@
package org.apache.spark.util
+import scala.concurrent.duration.{Duration, FiniteDuration}
+
import akka.actor.{ActorSystem, ExtendedActorSystem, IndestructibleActorSystem}
import com.typesafe.config.ConfigFactory
@@ -84,4 +86,8 @@ private[spark] object AkkaUtils {
(actorSystem, boundPort)
}
+ /** Returns the default Spark timeout to use for Akka ask operations. */
+ def askTimeout: FiniteDuration = {
+ Duration.create(System.getProperty("spark.akka.askTimeout", "30").toLong, "seconds")
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/JavaAPISuite.java b/core/src/test/scala/org/apache/spark/JavaAPISuite.java
index ee5d8c9f13..5e2899c97b 100644
--- a/core/src/test/scala/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/scala/org/apache/spark/JavaAPISuite.java
@@ -897,4 +897,37 @@ public class JavaAPISuite implements Serializable {
new Tuple2<Integer, Integer>(0, 4)), rdd3.collect());
}
+
+ @Test
+ public void collectPartitions() {
+ JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3);
+
+ JavaPairRDD<Integer, Integer> rdd2 = rdd1.map(new PairFunction<Integer, Integer, Integer>() {
+ @Override
+ public Tuple2<Integer, Integer> call(Integer i) throws Exception {
+ return new Tuple2<Integer, Integer>(i, i % 2);
+ }
+ });
+
+ List[] parts = rdd1.collectPartitions(new int[] {0});
+ Assert.assertEquals(Arrays.asList(1, 2), parts[0]);
+
+ parts = rdd1.collectPartitions(new int[] {1, 2});
+ Assert.assertEquals(Arrays.asList(3, 4), parts[0]);
+ Assert.assertEquals(Arrays.asList(5, 6, 7), parts[1]);
+
+ Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(1, 1),
+ new Tuple2<Integer, Integer>(2, 0)),
+ rdd2.collectPartitions(new int[] {0})[0]);
+
+ parts = rdd2.collectPartitions(new int[] {1, 2});
+ Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(3, 1),
+ new Tuple2<Integer, Integer>(4, 0)),
+ parts[0]);
+ Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(5, 1),
+ new Tuple2<Integer, Integer>(6, 0),
+ new Tuple2<Integer, Integer>(7, 1)),
+ parts[1]);
+ }
+
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
index 29c4cc5d9c..bb28a31a99 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
@@ -313,6 +313,7 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo
}
def createTaskResult(id: Int): DirectTaskResult[Int] = {
- new DirectTaskResult[Int](id, mutable.Map.empty, new TaskMetrics)
+ val valueSer = SparkEnv.get.serializer.newInstance()
+ new DirectTaskResult[Int](valueSer.serialize(id), mutable.Map.empty, new TaskMetrics)
}
}
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
new file mode 100644
index 0000000000..67a57a0e7f
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.jobs
+
+import org.scalatest.FunSuite
+import org.apache.spark.scheduler._
+import org.apache.spark.{LocalSparkContext, SparkContext, Success}
+import org.apache.spark.scheduler.SparkListenerTaskStart
+import org.apache.spark.executor.{ShuffleReadMetrics, TaskMetrics}
+
+class JobProgressListenerSuite extends FunSuite with LocalSparkContext {
+ test("test executor id to summary") {
+ val sc = new SparkContext("local", "test")
+ val listener = new JobProgressListener(sc)
+ val taskMetrics = new TaskMetrics()
+ val shuffleReadMetrics = new ShuffleReadMetrics()
+
+ // nothing in it
+ assert(listener.stageIdToExecutorSummaries.size == 0)
+
+ // finish this task, should get updated shuffleRead
+ shuffleReadMetrics.remoteBytesRead = 1000
+ taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics)
+ var taskInfo = new TaskInfo(1234L, 0, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL)
+ taskInfo.finishTime = 1
+ listener.onTaskEnd(new SparkListenerTaskEnd(
+ new ShuffleMapTask(0, null, null, 0, null), Success, taskInfo, taskMetrics))
+ assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-1", fail())
+ .shuffleRead == 1000)
+
+ // finish a task with unknown executor-id, nothing should happen
+ taskInfo = new TaskInfo(1234L, 0, 1000L, "exe-unknown", "host1", TaskLocality.NODE_LOCAL)
+ taskInfo.finishTime = 1
+ listener.onTaskEnd(new SparkListenerTaskEnd(
+ new ShuffleMapTask(0, null, null, 0, null), Success, taskInfo, taskMetrics))
+ assert(listener.stageIdToExecutorSummaries.size == 1)
+
+ // finish this task, should get updated duration
+ shuffleReadMetrics.remoteBytesRead = 1000
+ taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics)
+ taskInfo = new TaskInfo(1235L, 0, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL)
+ taskInfo.finishTime = 1
+ listener.onTaskEnd(new SparkListenerTaskEnd(
+ new ShuffleMapTask(0, null, null, 0, null), Success, taskInfo, taskMetrics))
+ assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-1", fail())
+ .shuffleRead == 2000)
+
+ // finish this task, should get updated duration
+ shuffleReadMetrics.remoteBytesRead = 1000
+ taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics)
+ taskInfo = new TaskInfo(1236L, 0, 0L, "exe-2", "host1", TaskLocality.NODE_LOCAL)
+ taskInfo.finishTime = 1
+ listener.onTaskEnd(new SparkListenerTaskEnd(
+ new ShuffleMapTask(0, null, null, 0, null), Success, taskInfo, taskMetrics))
+ assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-2", fail())
+ .shuffleRead == 1000)
+ }
+}