aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2014-10-30 15:32:46 -0700
committerAndrew Or <andrew@databricks.com>2014-10-30 15:32:46 -0700
commit9334d699671edd8f18370255017ad40c1d0340ee (patch)
treede004daa8852f1d3a055267063888114adff3dc2
parent5231a3f228b5482cba09ae23a9f68498eba03c88 (diff)
downloadspark-9334d699671edd8f18370255017ad40c1d0340ee.tar.gz
spark-9334d699671edd8f18370255017ad40c1d0340ee.tar.bz2
spark-9334d699671edd8f18370255017ad40c1d0340ee.zip
[SPARK-4155] Consolidate usages of <driver>
We use "\<driver\>" everywhere. Let's not do that. Author: Andrew Or <andrew@databricks.com> Closes #3020 from andrewor14/consolidate-driver and squashes the following commits: c1c2204 [Andrew Or] Just use "<driver>" for local executor ID 3d751e9 [Andrew Or] Consolidate usages of <driver>
-rw-r--r--core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala10
9 files changed, 23 insertions, 27 deletions
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index b2cf022baf..c11f1db006 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -419,7 +419,7 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
override def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded): Unit = {
val executorId = blockManagerAdded.blockManagerId.executorId
- if (executorId != "<driver>") {
+ if (executorId != SparkContext.DRIVER_IDENTIFIER) {
allocationManager.onExecutorAdded(executorId)
}
}
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 73668e83bb..6bfcd8ceae 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1333,6 +1333,8 @@ object SparkContext extends Logging {
private[spark] val SPARK_UNKNOWN_USER = "<unknown>"
+ private[spark] val DRIVER_IDENTIFIER = "<driver>"
+
implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
def addInPlace(t1: Double, t2: Double): Double = t1 + t2
def zero(initialValue: Double) = 0.0
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 6a6dfda363..557d2f5128 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -156,7 +156,7 @@ object SparkEnv extends Logging {
assert(conf.contains("spark.driver.port"), "spark.driver.port is not set on the driver!")
val hostname = conf.get("spark.driver.host")
val port = conf.get("spark.driver.port").toInt
- create(conf, "<driver>", hostname, port, true, isLocal, listenerBus)
+ create(conf, SparkContext.DRIVER_IDENTIFIER, hostname, port, true, isLocal, listenerBus)
}
/**
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
index 58b78f041c..c0264836de 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
@@ -21,7 +21,7 @@ import java.nio.ByteBuffer
import akka.actor.{Actor, ActorRef, Props}
-import org.apache.spark.{Logging, SparkEnv, TaskState}
+import org.apache.spark.{Logging, SparkContext, SparkEnv, TaskState}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.executor.{Executor, ExecutorBackend}
import org.apache.spark.scheduler.{SchedulerBackend, TaskSchedulerImpl, WorkerOffer}
@@ -47,7 +47,7 @@ private[spark] class LocalActor(
private var freeCores = totalCores
- private val localExecutorId = "localhost"
+ private val localExecutorId = SparkContext.DRIVER_IDENTIFIER
private val localExecutorHostname = "localhost"
val executor = new Executor(
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
index 259f423c73..b177a59c72 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
@@ -20,6 +20,7 @@ package org.apache.spark.storage
import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput}
import java.util.concurrent.ConcurrentHashMap
+import org.apache.spark.SparkContext
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.Utils
@@ -59,7 +60,7 @@ class BlockManagerId private (
def port: Int = port_
- def isDriver: Boolean = (executorId == "<driver>")
+ def isDriver: Boolean = { executorId == SparkContext.DRIVER_IDENTIFIER }
override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
out.writeUTF(executorId_)
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
index d9066f7664..def49e80a3 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
@@ -19,6 +19,7 @@ package org.apache.spark.storage
import scala.collection.mutable
+import org.apache.spark.SparkContext
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.scheduler._
@@ -59,10 +60,9 @@ class StorageStatusListener extends SparkListener {
val info = taskEnd.taskInfo
val metrics = taskEnd.taskMetrics
if (info != null && metrics != null) {
- val execId = formatExecutorId(info.executorId)
val updatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
if (updatedBlocks.length > 0) {
- updateStorageStatus(execId, updatedBlocks)
+ updateStorageStatus(info.executorId, updatedBlocks)
}
}
}
@@ -88,13 +88,4 @@ class StorageStatusListener extends SparkListener {
}
}
- /**
- * In the local mode, there is a discrepancy between the executor ID according to the
- * task ("localhost") and that according to SparkEnv ("<driver>"). In the UI, this
- * results in duplicate rows for the same executor. Thus, in this mode, we aggregate
- * these two rows and use the executor ID of "<driver>" to be consistent.
- */
- def formatExecutorId(execId: String): String = {
- if (execId == "localhost") "<driver>" else execId
- }
}
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
index 689cf02b25..9e0e71a51a 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
@@ -48,14 +48,14 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp
def storageStatusList = storageStatusListener.storageStatusList
override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized {
- val eid = formatExecutorId(taskStart.taskInfo.executorId)
+ val eid = taskStart.taskInfo.executorId
executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 0) + 1
}
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
val info = taskEnd.taskInfo
if (info != null) {
- val eid = formatExecutorId(info.executorId)
+ val eid = info.executorId
executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 1) - 1
executorToDuration(eid) = executorToDuration.getOrElse(eid, 0L) + info.duration
taskEnd.reason match {
@@ -84,6 +84,4 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp
}
}
- // This addresses executor ID inconsistencies in the local mode
- private def formatExecutorId(execId: String) = storageStatusListener.formatExecutorId(execId)
}
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index 1f1d53a1ee..c6d7105592 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -27,7 +27,7 @@ import org.mockito.Mockito.{mock, when}
import org.scalatest.{BeforeAndAfter, FunSuite, Matchers, PrivateMethodTester}
import org.scalatest.concurrent.Eventually._
-import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf}
+import org.apache.spark.{MapOutputTrackerMaster, SparkConf, SparkContext, SecurityManager}
import org.apache.spark.network.BlockTransferService
import org.apache.spark.network.nio.NioBlockTransferService
import org.apache.spark.scheduler.LiveListenerBus
@@ -57,7 +57,9 @@ class BlockManagerReplicationSuite extends FunSuite with Matchers with BeforeAnd
// Implicitly convert strings to BlockIds for test clarity.
implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value)
- private def makeBlockManager(maxMem: Long, name: String = "<driver>"): BlockManager = {
+ private def makeBlockManager(
+ maxMem: Long,
+ name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
val transfer = new NioBlockTransferService(conf, securityMgr)
val store = new BlockManager(name, actorSystem, master, serializer, maxMem, conf,
mapOutputTracker, shuffleManager, transfer)
@@ -108,7 +110,7 @@ class BlockManagerReplicationSuite extends FunSuite with Matchers with BeforeAnd
storeIds.filterNot { _ == stores(2).blockManagerId })
// Add driver store and test whether it is filtered out
- val driverStore = makeBlockManager(1000, "<driver>")
+ val driverStore = makeBlockManager(1000, SparkContext.DRIVER_IDENTIFIER)
assert(master.getPeers(stores(0).blockManagerId).forall(!_.isDriver))
assert(master.getPeers(stores(1).blockManagerId).forall(!_.isDriver))
assert(master.getPeers(stores(2).blockManagerId).forall(!_.isDriver))
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 9d96202a3e..715b740b85 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -37,7 +37,7 @@ import org.scalatest.{BeforeAndAfter, FunSuite, Matchers, PrivateMethodTester}
import org.scalatest.concurrent.Eventually._
import org.scalatest.concurrent.Timeouts._
-import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf}
+import org.apache.spark.{MapOutputTrackerMaster, SparkConf, SparkContext, SecurityManager}
import org.apache.spark.executor.DataReadMethod
import org.apache.spark.network.nio.NioBlockTransferService
import org.apache.spark.scheduler.LiveListenerBus
@@ -69,7 +69,9 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value)
def rdd(rddId: Int, splitId: Int) = RDDBlockId(rddId, splitId)
- private def makeBlockManager(maxMem: Long, name: String = "<driver>"): BlockManager = {
+ private def makeBlockManager(
+ maxMem: Long,
+ name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
val transfer = new NioBlockTransferService(conf, securityMgr)
new BlockManager(name, actorSystem, master, serializer, maxMem, conf,
mapOutputTracker, shuffleManager, transfer)
@@ -790,8 +792,8 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
test("block store put failure") {
// Use Java serializer so we can create an unserializable error.
val transfer = new NioBlockTransferService(conf, securityMgr)
- store = new BlockManager("<driver>", actorSystem, master, new JavaSerializer(conf), 1200, conf,
- mapOutputTracker, shuffleManager, transfer)
+ store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, actorSystem, master,
+ new JavaSerializer(conf), 1200, conf, mapOutputTracker, shuffleManager, transfer)
// The put should fail since a1 is not serializable.
class UnserializableClass