From 96c2c714f4f9abe20d4c42d99ffaafcb269714a1 Mon Sep 17 00:00:00 2001 From: Kostas Sakellis Date: Thu, 15 Jan 2015 17:53:42 -0800 Subject: [SPARK-4857] [CORE] Adds Executor membership events to SparkListener Adds onExecutorAdded and onExecutorRemoved events to the SparkListener. This will allow a client to get notified when an executor has been added/removed and provide additional information such as how many vcores it is consuming. In addition, this commit adds a SparkListenerAdapter to the Java API that provides default implementations to the SparkListener. This is to get around the fact that default implementations for traits don't work in Java. Having Java clients extend SparkListenerAdapter moving forward will prevent breakage in java when we add new events to SparkListener. Author: Kostas Sakellis Closes #3711 from ksakellis/kostas-spark-4857 and squashes the following commits: 946d2c5 [Kostas Sakellis] Added executorAdded/Removed events to MesosSchedulerBackend b1d054a [Kostas Sakellis] Remove executorInfo from ExecutorRemoved event 1727b38 [Kostas Sakellis] Renamed ExecutorDetails back to ExecutorInfo and other CR feedback 14fe78d [Kostas Sakellis] Added executor added/removed events to json protocol 93d087b [Kostas Sakellis] [SPARK-4857] [CORE] Adds Executor membership events to SparkListener --- .../java/org/apache/spark/JavaSparkListener.java | 97 ++++++++++++++++++++++ .../spark/deploy/master/ApplicationInfo.scala | 14 ++-- .../apache/spark/deploy/master/ExecutorDesc.scala | 52 ++++++++++++ .../apache/spark/deploy/master/ExecutorInfo.scala | 52 ------------ .../org/apache/spark/deploy/master/Master.scala | 2 +- .../apache/spark/deploy/master/WorkerInfo.scala | 6 +- .../spark/deploy/master/ui/ApplicationPage.scala | 4 +- .../spark/scheduler/EventLoggingListener.scala | 4 + .../org/apache/spark/scheduler/SparkListener.scala | 22 ++++- .../apache/spark/scheduler/SparkListenerBus.scala | 4 + .../cluster/CoarseGrainedSchedulerBackend.scala | 6 +- .../spark/scheduler/cluster/ExecutorData.scala | 6 +- .../spark/scheduler/cluster/ExecutorInfo.scala | 45 ++++++++++ .../cluster/mesos/MesosSchedulerBackend.scala | 32 +++++-- .../scala/org/apache/spark/util/JsonProtocol.scala | 40 ++++++++- .../scheduler/EventLoggingListenerSuite.scala | 3 +- .../scheduler/SparkListenerWithClusterSuite.scala | 62 ++++++++++++++ .../mesos/MesosSchedulerBackendSuite.scala | 16 +++- .../org/apache/spark/util/JsonProtocolSuite.scala | 41 +++++++++ 19 files changed, 425 insertions(+), 83 deletions(-) create mode 100644 core/src/main/java/org/apache/spark/JavaSparkListener.java create mode 100644 core/src/main/scala/org/apache/spark/deploy/master/ExecutorDesc.scala delete mode 100644 core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala create mode 100644 core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala create mode 100644 core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala (limited to 'core') diff --git a/core/src/main/java/org/apache/spark/JavaSparkListener.java b/core/src/main/java/org/apache/spark/JavaSparkListener.java new file mode 100644 index 0000000000..646496f313 --- /dev/null +++ b/core/src/main/java/org/apache/spark/JavaSparkListener.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark; + +import org.apache.spark.scheduler.SparkListener; +import org.apache.spark.scheduler.SparkListenerApplicationEnd; +import org.apache.spark.scheduler.SparkListenerApplicationStart; +import org.apache.spark.scheduler.SparkListenerBlockManagerAdded; +import org.apache.spark.scheduler.SparkListenerBlockManagerRemoved; +import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate; +import org.apache.spark.scheduler.SparkListenerExecutorAdded; +import org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate; +import org.apache.spark.scheduler.SparkListenerExecutorRemoved; +import org.apache.spark.scheduler.SparkListenerJobEnd; +import org.apache.spark.scheduler.SparkListenerJobStart; +import org.apache.spark.scheduler.SparkListenerStageCompleted; +import org.apache.spark.scheduler.SparkListenerStageSubmitted; +import org.apache.spark.scheduler.SparkListenerTaskEnd; +import org.apache.spark.scheduler.SparkListenerTaskGettingResult; +import org.apache.spark.scheduler.SparkListenerTaskStart; +import org.apache.spark.scheduler.SparkListenerUnpersistRDD; + +/** + * Java clients should extend this class instead of implementing + * SparkListener directly. This is to prevent java clients + * from breaking when new events are added to the SparkListener + * trait. + * + * This is a concrete class instead of abstract to enforce + * new events get added to both the SparkListener and this adapter + * in lockstep. + */ +public class JavaSparkListener implements SparkListener { + + @Override + public void onStageCompleted(SparkListenerStageCompleted stageCompleted) { } + + @Override + public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) { } + + @Override + public void onTaskStart(SparkListenerTaskStart taskStart) { } + + @Override + public void onTaskGettingResult(SparkListenerTaskGettingResult taskGettingResult) { } + + @Override + public void onTaskEnd(SparkListenerTaskEnd taskEnd) { } + + @Override + public void onJobStart(SparkListenerJobStart jobStart) { } + + @Override + public void onJobEnd(SparkListenerJobEnd jobEnd) { } + + @Override + public void onEnvironmentUpdate(SparkListenerEnvironmentUpdate environmentUpdate) { } + + @Override + public void onBlockManagerAdded(SparkListenerBlockManagerAdded blockManagerAdded) { } + + @Override + public void onBlockManagerRemoved(SparkListenerBlockManagerRemoved blockManagerRemoved) { } + + @Override + public void onUnpersistRDD(SparkListenerUnpersistRDD unpersistRDD) { } + + @Override + public void onApplicationStart(SparkListenerApplicationStart applicationStart) { } + + @Override + public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) { } + + @Override + public void onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate executorMetricsUpdate) { } + + @Override + public void onExecutorAdded(SparkListenerExecutorAdded executorAdded) { } + + @Override + public void onExecutorRemoved(SparkListenerExecutorRemoved executorRemoved) { } +} diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala index ad7d81747c..ede0a9dbef 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala @@ -38,8 +38,8 @@ private[spark] class ApplicationInfo( extends Serializable { @transient var state: ApplicationState.Value = _ - @transient var executors: mutable.HashMap[Int, ExecutorInfo] = _ - @transient var removedExecutors: ArrayBuffer[ExecutorInfo] = _ + @transient var executors: mutable.HashMap[Int, ExecutorDesc] = _ + @transient var removedExecutors: ArrayBuffer[ExecutorDesc] = _ @transient var coresGranted: Int = _ @transient var endTime: Long = _ @transient var appSource: ApplicationSource = _ @@ -55,12 +55,12 @@ private[spark] class ApplicationInfo( private def init() { state = ApplicationState.WAITING - executors = new mutable.HashMap[Int, ExecutorInfo] + executors = new mutable.HashMap[Int, ExecutorDesc] coresGranted = 0 endTime = -1L appSource = new ApplicationSource(this) nextExecutorId = 0 - removedExecutors = new ArrayBuffer[ExecutorInfo] + removedExecutors = new ArrayBuffer[ExecutorDesc] } private def newExecutorId(useID: Option[Int] = None): Int = { @@ -75,14 +75,14 @@ private[spark] class ApplicationInfo( } } - def addExecutor(worker: WorkerInfo, cores: Int, useID: Option[Int] = None): ExecutorInfo = { - val exec = new ExecutorInfo(newExecutorId(useID), this, worker, cores, desc.memoryPerSlave) + def addExecutor(worker: WorkerInfo, cores: Int, useID: Option[Int] = None): ExecutorDesc = { + val exec = new ExecutorDesc(newExecutorId(useID), this, worker, cores, desc.memoryPerSlave) executors(exec.id) = exec coresGranted += cores exec } - def removeExecutor(exec: ExecutorInfo) { + def removeExecutor(exec: ExecutorDesc) { if (executors.contains(exec.id)) { removedExecutors += executors(exec.id) executors -= exec.id diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ExecutorDesc.scala b/core/src/main/scala/org/apache/spark/deploy/master/ExecutorDesc.scala new file mode 100644 index 0000000000..5d620dfcab --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/master/ExecutorDesc.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.master + +import org.apache.spark.deploy.{ExecutorDescription, ExecutorState} + +private[spark] class ExecutorDesc( + val id: Int, + val application: ApplicationInfo, + val worker: WorkerInfo, + val cores: Int, + val memory: Int) { + + var state = ExecutorState.LAUNCHING + + /** Copy all state (non-val) variables from the given on-the-wire ExecutorDescription. */ + def copyState(execDesc: ExecutorDescription) { + state = execDesc.state + } + + def fullId: String = application.id + "/" + id + + override def equals(other: Any): Boolean = { + other match { + case info: ExecutorDesc => + fullId == info.fullId && + worker.id == info.worker.id && + cores == info.cores && + memory == info.memory + case _ => false + } + } + + override def toString: String = fullId + + override def hashCode: Int = toString.hashCode() +} diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala deleted file mode 100644 index d417070c51..0000000000 --- a/core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.master - -import org.apache.spark.deploy.{ExecutorDescription, ExecutorState} - -private[spark] class ExecutorInfo( - val id: Int, - val application: ApplicationInfo, - val worker: WorkerInfo, - val cores: Int, - val memory: Int) { - - var state = ExecutorState.LAUNCHING - - /** Copy all state (non-val) variables from the given on-the-wire ExecutorDescription. */ - def copyState(execDesc: ExecutorDescription) { - state = execDesc.state - } - - def fullId: String = application.id + "/" + id - - override def equals(other: Any): Boolean = { - other match { - case info: ExecutorInfo => - fullId == info.fullId && - worker.id == info.worker.id && - cores == info.cores && - memory == info.memory - case _ => false - } - } - - override def toString: String = fullId - - override def hashCode: Int = toString.hashCode() -} diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 4b631ec639..d92d99310a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -581,7 +581,7 @@ private[spark] class Master( } } - def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo) { + def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc) { logInfo("Launching executor " + exec.fullId + " on worker " + worker.id) worker.addExecutor(exec) worker.actor ! LaunchExecutor(masterUrl, diff --git a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala index 473ddc23ff..e94aae93e4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala @@ -38,7 +38,7 @@ private[spark] class WorkerInfo( Utils.checkHost(host, "Expected hostname") assert (port > 0) - @transient var executors: mutable.HashMap[String, ExecutorInfo] = _ // executorId => info + @transient var executors: mutable.HashMap[String, ExecutorDesc] = _ // executorId => info @transient var drivers: mutable.HashMap[String, DriverInfo] = _ // driverId => info @transient var state: WorkerState.Value = _ @transient var coresUsed: Int = _ @@ -70,13 +70,13 @@ private[spark] class WorkerInfo( host + ":" + port } - def addExecutor(exec: ExecutorInfo) { + def addExecutor(exec: ExecutorDesc) { executors(exec.fullId) = exec coresUsed += exec.cores memoryUsed += exec.memory } - def removeExecutor(exec: ExecutorInfo) { + def removeExecutor(exec: ExecutorDesc) { if (executors.contains(exec.fullId)) { executors -= exec.fullId coresUsed -= exec.cores diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala index 4588c130ef..3aae2b95d7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala @@ -27,7 +27,7 @@ import org.json4s.JValue import org.apache.spark.deploy.{ExecutorState, JsonProtocol} import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState} -import org.apache.spark.deploy.master.ExecutorInfo +import org.apache.spark.deploy.master.ExecutorDesc import org.apache.spark.ui.{UIUtils, WebUIPage} import org.apache.spark.util.Utils @@ -109,7 +109,7 @@ private[spark] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app UIUtils.basicSparkPage(content, "Application: " + app.desc.name) } - private def executorRow(executor: ExecutorInfo): Seq[Node] = { + private def executorRow(executor: ExecutorDesc): Seq[Node] = { {executor.id} diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index 27bf4f1599..30075c172b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -168,6 +168,10 @@ private[spark] class EventLoggingListener( logEvent(event, flushLogger = true) override def onApplicationEnd(event: SparkListenerApplicationEnd) = logEvent(event, flushLogger = true) + override def onExecutorAdded(event: SparkListenerExecutorAdded) = + logEvent(event, flushLogger = true) + override def onExecutorRemoved(event: SparkListenerExecutorRemoved) = + logEvent(event, flushLogger = true) // No-op because logging every update would be overkill override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate) { } diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index b62b0c1312..4840d8bd2d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -25,6 +25,7 @@ import scala.collection.mutable import org.apache.spark.{Logging, TaskEndReason} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.executor.TaskMetrics +import org.apache.spark.scheduler.cluster.ExecutorInfo import org.apache.spark.storage.BlockManagerId import org.apache.spark.util.{Distribution, Utils} @@ -84,6 +85,14 @@ case class SparkListenerBlockManagerRemoved(time: Long, blockManagerId: BlockMan @DeveloperApi case class SparkListenerUnpersistRDD(rddId: Int) extends SparkListenerEvent +@DeveloperApi +case class SparkListenerExecutorAdded(executorId: String, executorInfo: ExecutorInfo) + extends SparkListenerEvent + +@DeveloperApi +case class SparkListenerExecutorRemoved(executorId: String) + extends SparkListenerEvent + /** * Periodic updates from executors. * @param execId executor id @@ -109,7 +118,8 @@ private[spark] case object SparkListenerShutdown extends SparkListenerEvent /** * :: DeveloperApi :: * Interface for listening to events from the Spark scheduler. Note that this is an internal - * interface which might change in different Spark releases. + * interface which might change in different Spark releases. Java clients should extend + * {@link JavaSparkListener} */ @DeveloperApi trait SparkListener { @@ -183,6 +193,16 @@ trait SparkListener { * Called when the driver receives task metrics from an executor in a heartbeat. */ def onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate) { } + + /** + * Called when the driver registers a new executor. + */ + def onExecutorAdded(executorAdded: SparkListenerExecutorAdded) { } + + /** + * Called when the driver removes an executor. + */ + def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved) { } } /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala index e79ffd7a35..e700c6af54 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala @@ -70,6 +70,10 @@ private[spark] trait SparkListenerBus extends Logging { foreachListener(_.onApplicationEnd(applicationEnd)) case metricsUpdate: SparkListenerExecutorMetricsUpdate => foreachListener(_.onExecutorMetricsUpdate(metricsUpdate)) + case executorAdded: SparkListenerExecutorAdded => + foreachListener(_.onExecutorAdded(executorAdded)) + case executorRemoved: SparkListenerExecutorRemoved => + foreachListener(_.onExecutorRemoved(executorRemoved)) case SparkListenerShutdown => } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index fe9914b50b..5786d36746 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -28,7 +28,7 @@ import akka.pattern.ask import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent} import org.apache.spark.{ExecutorAllocationClient, Logging, SparkEnv, SparkException, TaskState} -import org.apache.spark.scheduler.{SchedulerBackend, SlaveLost, TaskDescription, TaskSchedulerImpl, WorkerOffer} +import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.util.{ActorLogReceive, SerializableBuffer, AkkaUtils, Utils} @@ -66,6 +66,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste // Number of executors requested from the cluster manager that have not registered yet private var numPendingExecutors = 0 + private val listenerBus = scheduler.sc.listenerBus + // Executors we have requested the cluster manager to kill that have not died yet private val executorsPendingToRemove = new HashSet[String] @@ -106,6 +108,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste logDebug(s"Decremented number of pending executors ($numPendingExecutors left)") } } + listenerBus.post(SparkListenerExecutorAdded(executorId, data)) makeOffers() } @@ -213,6 +216,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste totalCoreCount.addAndGet(-executorInfo.totalCores) totalRegisteredExecutors.addAndGet(-1) scheduler.executorLost(executorId, SlaveLost(reason)) + listenerBus.post(SparkListenerExecutorRemoved(executorId)) case None => logError(s"Asked to remove non-existent executor $executorId") } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala index b71bd5783d..eb52ddfb1e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala @@ -31,7 +31,7 @@ import akka.actor.{Address, ActorRef} private[cluster] class ExecutorData( val executorActor: ActorRef, val executorAddress: Address, - val executorHost: String , + override val executorHost: String, var freeCores: Int, - val totalCores: Int -) + override val totalCores: Int +) extends ExecutorInfo(executorHost, totalCores) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala new file mode 100644 index 0000000000..b4738e64c9 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster + +import org.apache.spark.annotation.DeveloperApi + +/** + * :: DeveloperApi :: + * Stores information about an executor to pass from the scheduler to SparkListeners. + */ +@DeveloperApi +class ExecutorInfo( + val executorHost: String, + val totalCores: Int +) { + + def canEqual(other: Any): Boolean = other.isInstanceOf[ExecutorInfo] + + override def equals(other: Any): Boolean = other match { + case that: ExecutorInfo => + (that canEqual this) && + executorHost == that.executorHost && + totalCores == that.totalCores + case _ => false + } + + override def hashCode(): Int = { + val state = Seq(executorHost, totalCores) + state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b) + } +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index 75d8ddf375..d252fe8595 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -27,9 +27,11 @@ import scala.collection.mutable.{HashMap, HashSet} import org.apache.mesos.protobuf.ByteString import org.apache.mesos.{Scheduler => MScheduler} import org.apache.mesos._ -import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _} +import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, + ExecutorInfo => MesosExecutorInfo, _} import org.apache.spark.{Logging, SparkContext, SparkException, TaskState} +import org.apache.spark.scheduler.cluster.ExecutorInfo import org.apache.spark.scheduler._ import org.apache.spark.util.Utils @@ -62,6 +64,9 @@ private[spark] class MesosSchedulerBackend( var classLoader: ClassLoader = null + // The listener bus to publish executor added/removed events. + val listenerBus = sc.listenerBus + @volatile var appId: String = _ override def start() { @@ -87,7 +92,7 @@ private[spark] class MesosSchedulerBackend( } } - def createExecutorInfo(execId: String): ExecutorInfo = { + def createExecutorInfo(execId: String): MesosExecutorInfo = { val executorSparkHome = sc.conf.getOption("spark.mesos.executor.home") .orElse(sc.getSparkHome()) // Fall back to driver Spark home for backward compatibility .getOrElse { @@ -141,7 +146,7 @@ private[spark] class MesosSchedulerBackend( Value.Scalar.newBuilder() .setValue(MemoryUtils.calculateTotalMemory(sc)).build()) .build() - ExecutorInfo.newBuilder() + MesosExecutorInfo.newBuilder() .setExecutorId(ExecutorID.newBuilder().setValue(execId).build()) .setCommand(command) .setData(ByteString.copyFrom(createExecArg())) @@ -237,6 +242,7 @@ private[spark] class MesosSchedulerBackend( } val slaveIdToOffer = usableOffers.map(o => o.getSlaveId.getValue -> o).toMap + val slaveIdToWorkerOffer = workerOffers.map(o => o.executorId -> o).toMap val mesosTasks = new HashMap[String, JArrayList[MesosTaskInfo]] @@ -260,6 +266,10 @@ private[spark] class MesosSchedulerBackend( val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout? mesosTasks.foreach { case (slaveId, tasks) => + slaveIdToWorkerOffer.get(slaveId).foreach(o => + listenerBus.post(SparkListenerExecutorAdded(slaveId, + new ExecutorInfo(o.host, o.cores))) + ) d.launchTasks(Collections.singleton(slaveIdToOffer(slaveId).getId), tasks, filters) } @@ -315,7 +325,7 @@ private[spark] class MesosSchedulerBackend( synchronized { if (status.getState == MesosTaskState.TASK_LOST && taskIdToSlaveId.contains(tid)) { // We lost the executor on this slave, so remember that it's gone - slaveIdsWithExecutors -= taskIdToSlaveId(tid) + removeExecutor(taskIdToSlaveId(tid)) } if (isFinished(status.getState)) { taskIdToSlaveId.remove(tid) @@ -344,12 +354,20 @@ private[spark] class MesosSchedulerBackend( override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {} + /** + * Remove executor associated with slaveId in a thread safe manner. + */ + private def removeExecutor(slaveId: String) = { + synchronized { + listenerBus.post(SparkListenerExecutorRemoved(slaveId)) + slaveIdsWithExecutors -= slaveId + } + } + private def recordSlaveLost(d: SchedulerDriver, slaveId: SlaveID, reason: ExecutorLossReason) { inClassLoader() { logInfo("Mesos slave lost: " + slaveId.getValue) - synchronized { - slaveIdsWithExecutors -= slaveId.getValue - } + removeExecutor(slaveId.getValue) scheduler.executorLost(slaveId.getValue, reason) } } diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index d94e825265..a025011006 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -19,6 +19,8 @@ package org.apache.spark.util import java.util.{Properties, UUID} +import org.apache.spark.scheduler.cluster.ExecutorInfo + import scala.collection.JavaConverters._ import scala.collection.Map @@ -83,7 +85,10 @@ private[spark] object JsonProtocol { applicationStartToJson(applicationStart) case applicationEnd: SparkListenerApplicationEnd => applicationEndToJson(applicationEnd) - + case executorAdded: SparkListenerExecutorAdded => + executorAddedToJson(executorAdded) + case executorRemoved: SparkListenerExecutorRemoved => + executorRemovedToJson(executorRemoved) // These aren't used, but keeps compiler happy case SparkListenerShutdown => JNothing case SparkListenerExecutorMetricsUpdate(_, _) => JNothing @@ -194,6 +199,16 @@ private[spark] object JsonProtocol { ("Timestamp" -> applicationEnd.time) } + def executorAddedToJson(executorAdded: SparkListenerExecutorAdded): JValue = { + ("Event" -> Utils.getFormattedClassName(executorAdded)) ~ + ("Executor ID" -> executorAdded.executorId) ~ + ("Executor Info" -> executorInfoToJson(executorAdded.executorInfo)) + } + + def executorRemovedToJson(executorRemoved: SparkListenerExecutorRemoved): JValue = { + ("Event" -> Utils.getFormattedClassName(executorRemoved)) ~ + ("Executor ID" -> executorRemoved.executorId) + } /** ------------------------------------------------------------------- * * JSON serialization methods for classes SparkListenerEvents depend on | @@ -362,6 +377,10 @@ private[spark] object JsonProtocol { ("Disk Size" -> blockStatus.diskSize) } + def executorInfoToJson(executorInfo: ExecutorInfo): JValue = { + ("Host" -> executorInfo.executorHost) ~ + ("Total Cores" -> executorInfo.totalCores) + } /** ------------------------------ * * Util JSON serialization methods | @@ -416,6 +435,8 @@ private[spark] object JsonProtocol { val unpersistRDD = Utils.getFormattedClassName(SparkListenerUnpersistRDD) val applicationStart = Utils.getFormattedClassName(SparkListenerApplicationStart) val applicationEnd = Utils.getFormattedClassName(SparkListenerApplicationEnd) + val executorAdded = Utils.getFormattedClassName(SparkListenerExecutorAdded) + val executorRemoved = Utils.getFormattedClassName(SparkListenerExecutorRemoved) (json \ "Event").extract[String] match { case `stageSubmitted` => stageSubmittedFromJson(json) @@ -431,6 +452,8 @@ private[spark] object JsonProtocol { case `unpersistRDD` => unpersistRDDFromJson(json) case `applicationStart` => applicationStartFromJson(json) case `applicationEnd` => applicationEndFromJson(json) + case `executorAdded` => executorAddedFromJson(json) + case `executorRemoved` => executorRemovedFromJson(json) } } @@ -523,6 +546,16 @@ private[spark] object JsonProtocol { SparkListenerApplicationEnd((json \ "Timestamp").extract[Long]) } + def executorAddedFromJson(json: JValue): SparkListenerExecutorAdded = { + val executorId = (json \ "Executor ID").extract[String] + val executorInfo = executorInfoFromJson(json \ "Executor Info") + SparkListenerExecutorAdded(executorId, executorInfo) + } + + def executorRemovedFromJson(json: JValue): SparkListenerExecutorRemoved = { + val executorId = (json \ "Executor ID").extract[String] + SparkListenerExecutorRemoved(executorId) + } /** --------------------------------------------------------------------- * * JSON deserialization methods for classes SparkListenerEvents depend on | @@ -745,6 +778,11 @@ private[spark] object JsonProtocol { BlockStatus(storageLevel, memorySize, diskSize, tachyonSize) } + def executorInfoFromJson(json: JValue): ExecutorInfo = { + val executorHost = (json \ "Host").extract[String] + val totalCores = (json \ "Total Cores").extract[Int] + new ExecutorInfo(executorHost, totalCores) + } /** -------------------------------- * * Util JSON deserialization methods | diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index 1de7e13003..437d8693c0 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -160,7 +160,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Loggin */ private def testApplicationEventLogging(compressionCodec: Option[String] = None) { val conf = getLoggingConf(testDirPath, compressionCodec) - val sc = new SparkContext("local", "test", conf) + val sc = new SparkContext("local-cluster[2,2,512]", "test", conf) assert(sc.eventLogger.isDefined) val eventLogger = sc.eventLogger.get val expectedLogDir = testDir.toURI().toString() @@ -184,6 +184,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Loggin val eventSet = mutable.Set( SparkListenerApplicationStart, SparkListenerBlockManagerAdded, + SparkListenerExecutorAdded, SparkListenerEnvironmentUpdate, SparkListenerJobStart, SparkListenerJobEnd, diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala new file mode 100644 index 0000000000..623a687c35 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.apache.spark.scheduler.cluster.ExecutorInfo +import org.apache.spark.{SparkContext, LocalSparkContext} + +import org.scalatest.{FunSuite, BeforeAndAfter, BeforeAndAfterAll} + +import scala.collection.mutable + +/** + * Unit tests for SparkListener that require a local cluster. + */ +class SparkListenerWithClusterSuite extends FunSuite with LocalSparkContext + with BeforeAndAfter with BeforeAndAfterAll { + + /** Length of time to wait while draining listener events. */ + val WAIT_TIMEOUT_MILLIS = 10000 + + before { + sc = new SparkContext("local-cluster[2,1,512]", "SparkListenerSuite") + } + + test("SparkListener sends executor added message") { + val listener = new SaveExecutorInfo + sc.addSparkListener(listener) + + val rdd1 = sc.parallelize(1 to 100, 4) + val rdd2 = rdd1.map(_.toString) + rdd2.setName("Target RDD") + rdd2.count() + + assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) + assert(listener.addedExecutorInfo.size == 2) + assert(listener.addedExecutorInfo("0").totalCores == 1) + assert(listener.addedExecutorInfo("1").totalCores == 1) + } + + private class SaveExecutorInfo extends SparkListener { + val addedExecutorInfo = mutable.Map[String, ExecutorInfo]() + + override def onExecutorAdded(executor: SparkListenerExecutorAdded) { + addedExecutorInfo(executor.executorId) = executor.executorInfo + } + } +} diff --git a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala index 48f5e40f50..78a30a40bf 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala @@ -18,17 +18,20 @@ package org.apache.spark.scheduler.mesos import org.scalatest.FunSuite -import org.apache.spark.{scheduler, SparkConf, SparkContext, LocalSparkContext} -import org.apache.spark.scheduler.{TaskDescription, WorkerOffer, TaskSchedulerImpl} +import org.apache.spark.{SparkConf, SparkContext, LocalSparkContext} +import org.apache.spark.scheduler.{SparkListenerExecutorAdded, LiveListenerBus, + TaskDescription, WorkerOffer, TaskSchedulerImpl} +import org.apache.spark.scheduler.cluster.ExecutorInfo import org.apache.spark.scheduler.cluster.mesos.{MemoryUtils, MesosSchedulerBackend} import org.apache.mesos.SchedulerDriver -import org.apache.mesos.Protos._ -import org.scalatest.mock.EasyMockSugar +import org.apache.mesos.Protos.{ExecutorInfo => MesosExecutorInfo, _} import org.apache.mesos.Protos.Value.Scalar import org.easymock.{Capture, EasyMock} import java.nio.ByteBuffer import java.util.Collections import java.util +import org.scalatest.mock.EasyMockSugar + import scala.collection.mutable import scala.collection.mutable.ArrayBuffer @@ -52,11 +55,16 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea val driver = EasyMock.createMock(classOf[SchedulerDriver]) val taskScheduler = EasyMock.createMock(classOf[TaskSchedulerImpl]) + val listenerBus = EasyMock.createMock(classOf[LiveListenerBus]) + listenerBus.post(SparkListenerExecutorAdded("s1", new ExecutorInfo("host1", 2))) + EasyMock.replay(listenerBus) + val sc = EasyMock.createMock(classOf[SparkContext]) EasyMock.expect(sc.executorMemory).andReturn(100).anyTimes() EasyMock.expect(sc.getSparkHome()).andReturn(Option("/path")).anyTimes() EasyMock.expect(sc.executorEnvs).andReturn(new mutable.HashMap).anyTimes() EasyMock.expect(sc.conf).andReturn(new SparkConf).anyTimes() + EasyMock.expect(sc.listenerBus).andReturn(listenerBus) EasyMock.replay(sc) val minMem = MemoryUtils.calculateTotalMemory(sc).toInt diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 63c2559c5c..5ba94ff67d 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.util import java.util.Properties +import org.apache.spark.scheduler.cluster.ExecutorInfo import org.apache.spark.shuffle.MetadataFetchFailedException import scala.collection.Map @@ -69,6 +70,9 @@ class JsonProtocolSuite extends FunSuite { val unpersistRdd = SparkListenerUnpersistRDD(12345) val applicationStart = SparkListenerApplicationStart("The winner of all", None, 42L, "Garfield") val applicationEnd = SparkListenerApplicationEnd(42L) + val executorAdded = SparkListenerExecutorAdded("exec1", + new ExecutorInfo("Hostee.awesome.com", 11)) + val executorRemoved = SparkListenerExecutorRemoved("exec2") testEvent(stageSubmitted, stageSubmittedJsonString) testEvent(stageCompleted, stageCompletedJsonString) @@ -85,6 +89,8 @@ class JsonProtocolSuite extends FunSuite { testEvent(unpersistRdd, unpersistRDDJsonString) testEvent(applicationStart, applicationStartJsonString) testEvent(applicationEnd, applicationEndJsonString) + testEvent(executorAdded, executorAddedJsonString) + testEvent(executorRemoved, executorRemovedJsonString) } test("Dependent Classes") { @@ -94,6 +100,7 @@ class JsonProtocolSuite extends FunSuite { testTaskMetrics(makeTaskMetrics( 33333L, 44444L, 55555L, 66666L, 7, 8, hasHadoopInput = false, hasOutput = false)) testBlockManagerId(BlockManagerId("Hong", "Kong", 500)) + testExecutorInfo(new ExecutorInfo("host", 43)) // StorageLevel testStorageLevel(StorageLevel.NONE) @@ -303,6 +310,10 @@ class JsonProtocolSuite extends FunSuite { assert(blockId === newBlockId) } + private def testExecutorInfo(info: ExecutorInfo) { + val newInfo = JsonProtocol.executorInfoFromJson(JsonProtocol.executorInfoToJson(info)) + assertEquals(info, newInfo) + } /** -------------------------------- * | Util methods for comparing events | @@ -335,6 +346,11 @@ class JsonProtocolSuite extends FunSuite { assertEquals(e1.jobResult, e2.jobResult) case (e1: SparkListenerEnvironmentUpdate, e2: SparkListenerEnvironmentUpdate) => assertEquals(e1.environmentDetails, e2.environmentDetails) + case (e1: SparkListenerExecutorAdded, e2: SparkListenerExecutorAdded) => + assert(e1.executorId == e1.executorId) + assertEquals(e1.executorInfo, e2.executorInfo) + case (e1: SparkListenerExecutorRemoved, e2: SparkListenerExecutorRemoved) => + assert(e1.executorId == e1.executorId) case (e1, e2) => assert(e1 === e2) case _ => fail("Events don't match in types!") @@ -387,6 +403,11 @@ class JsonProtocolSuite extends FunSuite { assert(info1.accumulables === info2.accumulables) } + private def assertEquals(info1: ExecutorInfo, info2: ExecutorInfo) { + assert(info1.executorHost == info2.executorHost) + assert(info1.totalCores == info2.totalCores) + } + private def assertEquals(metrics1: TaskMetrics, metrics2: TaskMetrics) { assert(metrics1.hostname === metrics2.hostname) assert(metrics1.executorDeserializeTime === metrics2.executorDeserializeTime) @@ -1407,4 +1428,24 @@ class JsonProtocolSuite extends FunSuite { | "Timestamp": 42 |} """ + + private val executorAddedJsonString = + """ + |{ + | "Event": "SparkListenerExecutorAdded", + | "Executor ID": "exec1", + | "Executor Info": { + | "Host": "Hostee.awesome.com", + | "Total Cores": 11 + | } + |} + """ + + private val executorRemovedJsonString = + """ + |{ + | "Event": "SparkListenerExecutorRemoved", + | "Executor ID": "exec2" + |} + """ } -- cgit v1.2.3