aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2016-08-02 12:02:11 -0700
committerJosh Rosen <joshrosen@databricks.com>2016-08-02 12:02:11 -0700
commite9fc0b6a8b4ce62cab56d18581f588c67b811f5b (patch)
tree8b8f6401cd5760f5bac5510cf1987d0864c3c0d2
parenta9beeaaaeb52e9c940fe86a3d70801655401623c (diff)
downloadspark-e9fc0b6a8b4ce62cab56d18581f588c67b811f5b.tar.gz
spark-e9fc0b6a8b4ce62cab56d18581f588c67b811f5b.tar.bz2
spark-e9fc0b6a8b4ce62cab56d18581f588c67b811f5b.zip
[SPARK-16787] SparkContext.addFile() should not throw if called twice with the same file
## What changes were proposed in this pull request? The behavior of `SparkContext.addFile()` changed slightly with the introduction of the Netty-RPC-based file server, which was introduced in Spark 1.6 (where it was disabled by default) and became the default / only file server in Spark 2.0.0. Prior to 2.0, calling `SparkContext.addFile()` with files that have the same name and identical contents would succeed. This behavior was never explicitly documented but Spark has behaved this way since very early 1.x versions. In 2.0 (or 1.6 with the Netty file server enabled), the second `addFile()` call will fail with a requirement error because NettyStreamManager tries to guard against duplicate file registration. This problem also affects `addJar()` in a more subtle way: the `fileServer.addJar()` call will also fail with an exception but that exception is logged and ignored; I believe that the problematic exception-catching path was mistakenly copied from some old code which was only relevant to very old versions of Spark and YARN mode. I believe that this change of behavior was unintentional, so this patch weakens the `require` check so that adding the same filename at the same path will succeed. At file download time, Spark tasks will fail with exceptions if an executor already has a local copy of a file and that file's contents do not match the contents of the file being downloaded / added. As a result, it's important that we prevent files with the same name and different contents from being served because allowing that can effectively brick an executor by preventing it from successfully launching any new tasks. Before this patch's change, this was prevented by forbidding `addFile()` from being called twice on files with the same name. Because Spark does not defensively copy local files that are passed to `addFile` it is vulnerable to files' contents changing, so I think it's okay to rely on an implicit assumption that these files are intended to be immutable (since if they _are_ mutable then this can lead to either explicit task failures or implicit incorrectness (in case new executors silently get newer copies of the file while old executors continue to use an older version)). To guard against this, I have decided to only update the file addition timestamps on the first call to `addFile()`; duplicate calls will succeed but will not update the timestamp. This behavior is fine as long as we assume files are immutable, which seems reasonable given the behaviors described above. As part of this change, I also improved the thread-safety of the `addedJars` and `addedFiles` maps; this is important because these maps may be concurrently read by a task launching thread and written by a driver thread in case the user's driver code is multi-threaded. ## How was this patch tested? I added regression tests in `SparkContextSuite`. Author: Josh Rosen <joshrosen@databricks.com> Closes #14396 from JoshRosen/SPARK-16787.
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala36
-rw-r--r--core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Task.scala5
-rw-r--r--core/src/test/scala/org/apache/spark/SparkContextSuite.scala51
4 files changed, 78 insertions, 26 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index d48e2b420d..48126c255f 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -21,7 +21,7 @@ import java.io._
import java.lang.reflect.Constructor
import java.net.URI
import java.util.{Arrays, Locale, Properties, ServiceLoader, UUID}
-import java.util.concurrent.ConcurrentMap
+import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReference}
import scala.collection.JavaConverters._
@@ -262,8 +262,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
private[spark] def env: SparkEnv = _env
// Used to store a URL for each static file/jar together with the file's local timestamp
- private[spark] val addedFiles = HashMap[String, Long]()
- private[spark] val addedJars = HashMap[String, Long]()
+ private[spark] val addedFiles = new ConcurrentHashMap[String, Long]().asScala
+ private[spark] val addedJars = new ConcurrentHashMap[String, Long]().asScala
// Keeps track of all persisted RDDs
private[spark] val persistentRdds = {
@@ -1430,14 +1430,14 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
schemeCorrectedPath
}
val timestamp = System.currentTimeMillis
- addedFiles(key) = timestamp
-
- // Fetch the file locally in case a job is executed using DAGScheduler.runLocally().
- Utils.fetchFile(path, new File(SparkFiles.getRootDirectory()), conf, env.securityManager,
- hadoopConfiguration, timestamp, useCache = false)
-
- logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key))
- postEnvironmentUpdate()
+ if (addedFiles.putIfAbsent(key, timestamp).isEmpty) {
+ logInfo(s"Added file $path at $key with timestamp $timestamp")
+ // Fetch the file locally so that closures which are run on the driver can still use the
+ // SparkFiles API to access files.
+ Utils.fetchFile(path, new File(SparkFiles.getRootDirectory()), conf, env.securityManager,
+ hadoopConfiguration, timestamp, useCache = false)
+ postEnvironmentUpdate()
+ }
}
/**
@@ -1705,12 +1705,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
case exc: FileNotFoundException =>
logError(s"Jar not found at $path")
null
- case e: Exception =>
- // For now just log an error but allow to go through so spark examples work.
- // The spark examples don't really need the jar distributed since its also
- // the app jar.
- logError("Error adding jar (" + e + "), was the --addJars option used?")
- null
}
}
// A JAR file which exists locally on every worker node
@@ -1721,11 +1715,13 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}
}
if (key != null) {
- addedJars(key) = System.currentTimeMillis
- logInfo("Added JAR " + path + " at " + key + " with timestamp " + addedJars(key))
+ val timestamp = System.currentTimeMillis
+ if (addedJars.putIfAbsent(key, timestamp).isEmpty) {
+ logInfo(s"Added JAR $path at $key with timestamp $timestamp")
+ postEnvironmentUpdate()
+ }
}
}
- postEnvironmentUpdate()
}
/**
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
index afcb023a99..780fadd5bd 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
@@ -66,14 +66,18 @@ private[netty] class NettyStreamManager(rpcEnv: NettyRpcEnv)
}
override def addFile(file: File): String = {
- require(files.putIfAbsent(file.getName(), file) == null,
- s"File ${file.getName()} already registered.")
+ val existingPath = files.putIfAbsent(file.getName, file)
+ require(existingPath == null || existingPath == file,
+ s"File ${file.getName} was already registered with a different path " +
+ s"(old path = $existingPath, new path = $file")
s"${rpcEnv.address.toSparkURL}/files/${Utils.encodeFileNameToURIRawPath(file.getName())}"
}
override def addJar(file: File): String = {
- require(jars.putIfAbsent(file.getName(), file) == null,
- s"JAR ${file.getName()} already registered.")
+ val existingPath = jars.putIfAbsent(file.getName, file)
+ require(existingPath == null || existingPath == file,
+ s"File ${file.getName} was already registered with a different path " +
+ s"(old path = $existingPath, new path = $file")
s"${rpcEnv.address.toSparkURL}/jars/${Utils.encodeFileNameToURIRawPath(file.getName())}"
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index 15f863b66c..35c4dafe9c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -21,6 +21,7 @@ import java.io.{DataInputStream, DataOutputStream}
import java.nio.ByteBuffer
import java.util.Properties
+import scala.collection.mutable
import scala.collection.mutable.HashMap
import org.apache.spark._
@@ -198,8 +199,8 @@ private[spark] object Task {
*/
def serializeWithDependencies(
task: Task[_],
- currentFiles: HashMap[String, Long],
- currentJars: HashMap[String, Long],
+ currentFiles: mutable.Map[String, Long],
+ currentJars: mutable.Map[String, Long],
serializer: SerializerInstance)
: ByteBuffer = {
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index 4fa3cab181..f8d143dc61 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -216,6 +216,57 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext {
}
}
+ test("cannot call addFile with different paths that have the same filename") {
+ val dir = Utils.createTempDir()
+ try {
+ val subdir1 = new File(dir, "subdir1")
+ val subdir2 = new File(dir, "subdir2")
+ assert(subdir1.mkdir())
+ assert(subdir2.mkdir())
+ val file1 = new File(subdir1, "file")
+ val file2 = new File(subdir2, "file")
+ Files.write("old", file1, StandardCharsets.UTF_8)
+ Files.write("new", file2, StandardCharsets.UTF_8)
+ sc = new SparkContext("local-cluster[1,1,1024]", "test")
+ sc.addFile(file1.getAbsolutePath)
+ def getAddedFileContents(): String = {
+ sc.parallelize(Seq(0)).map { _ =>
+ scala.io.Source.fromFile(SparkFiles.get("file")).mkString
+ }.first()
+ }
+ assert(getAddedFileContents() === "old")
+ intercept[IllegalArgumentException] {
+ sc.addFile(file2.getAbsolutePath)
+ }
+ assert(getAddedFileContents() === "old")
+ } finally {
+ Utils.deleteRecursively(dir)
+ }
+ }
+
+ // Regression tests for SPARK-16787
+ for (
+ schedulingMode <- Seq("local-mode", "non-local-mode");
+ method <- Seq("addJar", "addFile")
+ ) {
+ val jarPath = Thread.currentThread().getContextClassLoader.getResource("TestUDTF.jar").toString
+ val master = schedulingMode match {
+ case "local-mode" => "local"
+ case "non-local-mode" => "local-cluster[1,1,1024]"
+ }
+ test(s"$method can be called twice with same file in $schedulingMode (SPARK-16787)") {
+ sc = new SparkContext(master, "test")
+ method match {
+ case "addJar" =>
+ sc.addJar(jarPath)
+ sc.addJar(jarPath)
+ case "addFile" =>
+ sc.addFile(jarPath)
+ sc.addFile(jarPath)
+ }
+ }
+ }
+
test("Cancelling job group should not cause SparkContext to shutdown (SPARK-6414)") {
try {
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))