aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala/org/apache
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2016-08-02 12:02:11 -0700
committerJosh Rosen <joshrosen@databricks.com>2016-08-02 12:02:11 -0700
commite9fc0b6a8b4ce62cab56d18581f588c67b811f5b (patch)
tree8b8f6401cd5760f5bac5510cf1987d0864c3c0d2 /core/src/test/scala/org/apache
parenta9beeaaaeb52e9c940fe86a3d70801655401623c (diff)
downloadspark-e9fc0b6a8b4ce62cab56d18581f588c67b811f5b.tar.gz
spark-e9fc0b6a8b4ce62cab56d18581f588c67b811f5b.tar.bz2
spark-e9fc0b6a8b4ce62cab56d18581f588c67b811f5b.zip
[SPARK-16787] SparkContext.addFile() should not throw if called twice with the same file
## What changes were proposed in this pull request? The behavior of `SparkContext.addFile()` changed slightly with the introduction of the Netty-RPC-based file server, which was introduced in Spark 1.6 (where it was disabled by default) and became the default / only file server in Spark 2.0.0. Prior to 2.0, calling `SparkContext.addFile()` with files that have the same name and identical contents would succeed. This behavior was never explicitly documented but Spark has behaved this way since very early 1.x versions. In 2.0 (or 1.6 with the Netty file server enabled), the second `addFile()` call will fail with a requirement error because NettyStreamManager tries to guard against duplicate file registration. This problem also affects `addJar()` in a more subtle way: the `fileServer.addJar()` call will also fail with an exception but that exception is logged and ignored; I believe that the problematic exception-catching path was mistakenly copied from some old code which was only relevant to very old versions of Spark and YARN mode. I believe that this change of behavior was unintentional, so this patch weakens the `require` check so that adding the same filename at the same path will succeed. At file download time, Spark tasks will fail with exceptions if an executor already has a local copy of a file and that file's contents do not match the contents of the file being downloaded / added. As a result, it's important that we prevent files with the same name and different contents from being served because allowing that can effectively brick an executor by preventing it from successfully launching any new tasks. Before this patch's change, this was prevented by forbidding `addFile()` from being called twice on files with the same name. Because Spark does not defensively copy local files that are passed to `addFile` it is vulnerable to files' contents changing, so I think it's okay to rely on an implicit assumption that these files are intended to be immutable (since if they _are_ mutable then this can lead to either explicit task failures or implicit incorrectness (in case new executors silently get newer copies of the file while old executors continue to use an older version)). To guard against this, I have decided to only update the file addition timestamps on the first call to `addFile()`; duplicate calls will succeed but will not update the timestamp. This behavior is fine as long as we assume files are immutable, which seems reasonable given the behaviors described above. As part of this change, I also improved the thread-safety of the `addedJars` and `addedFiles` maps; this is important because these maps may be concurrently read by a task launching thread and written by a driver thread in case the user's driver code is multi-threaded. ## How was this patch tested? I added regression tests in `SparkContextSuite`. Author: Josh Rosen <joshrosen@databricks.com> Closes #14396 from JoshRosen/SPARK-16787.
Diffstat (limited to 'core/src/test/scala/org/apache')
-rw-r--r--core/src/test/scala/org/apache/spark/SparkContextSuite.scala51
1 files changed, 51 insertions, 0 deletions
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index 4fa3cab181..f8d143dc61 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -216,6 +216,57 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext {
}
}
+ test("cannot call addFile with different paths that have the same filename") {
+ val dir = Utils.createTempDir()
+ try {
+ val subdir1 = new File(dir, "subdir1")
+ val subdir2 = new File(dir, "subdir2")
+ assert(subdir1.mkdir())
+ assert(subdir2.mkdir())
+ val file1 = new File(subdir1, "file")
+ val file2 = new File(subdir2, "file")
+ Files.write("old", file1, StandardCharsets.UTF_8)
+ Files.write("new", file2, StandardCharsets.UTF_8)
+ sc = new SparkContext("local-cluster[1,1,1024]", "test")
+ sc.addFile(file1.getAbsolutePath)
+ def getAddedFileContents(): String = {
+ sc.parallelize(Seq(0)).map { _ =>
+ scala.io.Source.fromFile(SparkFiles.get("file")).mkString
+ }.first()
+ }
+ assert(getAddedFileContents() === "old")
+ intercept[IllegalArgumentException] {
+ sc.addFile(file2.getAbsolutePath)
+ }
+ assert(getAddedFileContents() === "old")
+ } finally {
+ Utils.deleteRecursively(dir)
+ }
+ }
+
+ // Regression tests for SPARK-16787
+ for (
+ schedulingMode <- Seq("local-mode", "non-local-mode");
+ method <- Seq("addJar", "addFile")
+ ) {
+ val jarPath = Thread.currentThread().getContextClassLoader.getResource("TestUDTF.jar").toString
+ val master = schedulingMode match {
+ case "local-mode" => "local"
+ case "non-local-mode" => "local-cluster[1,1,1024]"
+ }
+ test(s"$method can be called twice with same file in $schedulingMode (SPARK-16787)") {
+ sc = new SparkContext(master, "test")
+ method match {
+ case "addJar" =>
+ sc.addJar(jarPath)
+ sc.addJar(jarPath)
+ case "addFile" =>
+ sc.addFile(jarPath)
+ sc.addFile(jarPath)
+ }
+ }
+ }
+
test("Cancelling job group should not cause SparkContext to shutdown (SPARK-6414)") {
try {
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))