aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala5
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala6
3 files changed, 21 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 942c5975ec..3f1a7dd99d 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1901,7 +1901,17 @@ object SparkContext extends Logging {
private[spark] val SPARK_JOB_INTERRUPT_ON_CANCEL = "spark.job.interruptOnCancel"
- private[spark] val DRIVER_IDENTIFIER = "<driver>"
+ /**
+ * Executor id for the driver. In earlier versions of Spark, this was `<driver>`, but this was
+ * changed to `driver` because the angle brackets caused escaping issues in URLs and XML (see
+ * SPARK-6716 for more details).
+ */
+ private[spark] val DRIVER_IDENTIFIER = "driver"
+
+ /**
+ * Legacy version of DRIVER_IDENTIFIER, retained for backwards-compatibility.
+ */
+ private[spark] val LEGACY_DRIVER_IDENTIFIER = "<driver>"
// The following deprecated objects have already been copied to `object AccumulatorParam` to
// make the compiler find them automatically. They are duplicate codes only for backward
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
index a6f1ebf325..69ac37511e 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
@@ -60,7 +60,10 @@ class BlockManagerId private (
def port: Int = port_
- def isDriver: Boolean = { executorId == SparkContext.DRIVER_IDENTIFIER }
+ def isDriver: Boolean = {
+ executorId == SparkContext.DRIVER_IDENTIFIER ||
+ executorId == SparkContext.LEGACY_DRIVER_IDENTIFIER
+ }
override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
out.writeUTF(executorId_)
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 283090e3bd..6dc5bc4cb0 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -139,6 +139,12 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach
assert(id2_.eq(id1), "Deserialized id2 is not the same object as original id1")
}
+ test("BlockManagerId.isDriver() backwards-compatibility with legacy driver ids (SPARK-6716)") {
+ assert(BlockManagerId(SparkContext.DRIVER_IDENTIFIER, "XXX", 1).isDriver)
+ assert(BlockManagerId(SparkContext.LEGACY_DRIVER_IDENTIFIER, "XXX", 1).isDriver)
+ assert(!BlockManagerId("notADriverIdentifier", "XXX", 1).isDriver)
+ }
+
test("master + 1 manager interaction") {
store = makeBlockManager(20000)
val a1 = new Array[Byte](4000)