aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala36
-rw-r--r--core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Task.scala5
-rw-r--r--core/src/test/scala/org/apache/spark/SparkContextSuite.scala51
4 files changed, 78 insertions, 26 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index d48e2b420d..48126c255f 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -21,7 +21,7 @@ import java.io._
import java.lang.reflect.Constructor
import java.net.URI
import java.util.{Arrays, Locale, Properties, ServiceLoader, UUID}
-import java.util.concurrent.ConcurrentMap
+import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReference}
import scala.collection.JavaConverters._
@@ -262,8 +262,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
private[spark] def env: SparkEnv = _env
// Used to store a URL for each static file/jar together with the file's local timestamp
- private[spark] val addedFiles = HashMap[String, Long]()
- private[spark] val addedJars = HashMap[String, Long]()
+ private[spark] val addedFiles = new ConcurrentHashMap[String, Long]().asScala
+ private[spark] val addedJars = new ConcurrentHashMap[String, Long]().asScala
// Keeps track of all persisted RDDs
private[spark] val persistentRdds = {
@@ -1430,14 +1430,14 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
schemeCorrectedPath
}
val timestamp = System.currentTimeMillis
- addedFiles(key) = timestamp
-
- // Fetch the file locally in case a job is executed using DAGScheduler.runLocally().
- Utils.fetchFile(path, new File(SparkFiles.getRootDirectory()), conf, env.securityManager,
- hadoopConfiguration, timestamp, useCache = false)
-
- logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key))
- postEnvironmentUpdate()
+ if (addedFiles.putIfAbsent(key, timestamp).isEmpty) {
+ logInfo(s"Added file $path at $key with timestamp $timestamp")
+ // Fetch the file locally so that closures which are run on the driver can still use the
+ // SparkFiles API to access files.
+ Utils.fetchFile(path, new File(SparkFiles.getRootDirectory()), conf, env.securityManager,
+ hadoopConfiguration, timestamp, useCache = false)
+ postEnvironmentUpdate()
+ }
}
/**
@@ -1705,12 +1705,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
case exc: FileNotFoundException =>
logError(s"Jar not found at $path")
null
- case e: Exception =>
- // For now just log an error but allow to go through so spark examples work.
- // The spark examples don't really need the jar distributed since its also
- // the app jar.
- logError("Error adding jar (" + e + "), was the --addJars option used?")
- null
}
}
// A JAR file which exists locally on every worker node
@@ -1721,11 +1715,13 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}
}
if (key != null) {
- addedJars(key) = System.currentTimeMillis
- logInfo("Added JAR " + path + " at " + key + " with timestamp " + addedJars(key))
+ val timestamp = System.currentTimeMillis
+ if (addedJars.putIfAbsent(key, timestamp).isEmpty) {
+ logInfo(s"Added JAR $path at $key with timestamp $timestamp")
+ postEnvironmentUpdate()
+ }
}
}
- postEnvironmentUpdate()
}
/**
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
index afcb023a99..780fadd5bd 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
@@ -66,14 +66,18 @@ private[netty] class NettyStreamManager(rpcEnv: NettyRpcEnv)
}
override def addFile(file: File): String = {
- require(files.putIfAbsent(file.getName(), file) == null,
- s"File ${file.getName()} already registered.")
+ val existingPath = files.putIfAbsent(file.getName, file)
+ require(existingPath == null || existingPath == file,
+ s"File ${file.getName} was already registered with a different path " +
+ s"(old path = $existingPath, new path = $file")
s"${rpcEnv.address.toSparkURL}/files/${Utils.encodeFileNameToURIRawPath(file.getName())}"
}
override def addJar(file: File): String = {
- require(jars.putIfAbsent(file.getName(), file) == null,
- s"JAR ${file.getName()} already registered.")
+ val existingPath = jars.putIfAbsent(file.getName, file)
+ require(existingPath == null || existingPath == file,
+ s"File ${file.getName} was already registered with a different path " +
+ s"(old path = $existingPath, new path = $file")
s"${rpcEnv.address.toSparkURL}/jars/${Utils.encodeFileNameToURIRawPath(file.getName())}"
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index 15f863b66c..35c4dafe9c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -21,6 +21,7 @@ import java.io.{DataInputStream, DataOutputStream}
import java.nio.ByteBuffer
import java.util.Properties
+import scala.collection.mutable
import scala.collection.mutable.HashMap
import org.apache.spark._
@@ -198,8 +199,8 @@ private[spark] object Task {
*/
def serializeWithDependencies(
task: Task[_],
- currentFiles: HashMap[String, Long],
- currentJars: HashMap[String, Long],
+ currentFiles: mutable.Map[String, Long],
+ currentJars: mutable.Map[String, Long],
serializer: SerializerInstance)
: ByteBuffer = {
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index 4fa3cab181..f8d143dc61 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -216,6 +216,57 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext {
}
}
+ test("cannot call addFile with different paths that have the same filename") {
+ val dir = Utils.createTempDir()
+ try {
+ val subdir1 = new File(dir, "subdir1")
+ val subdir2 = new File(dir, "subdir2")
+ assert(subdir1.mkdir())
+ assert(subdir2.mkdir())
+ val file1 = new File(subdir1, "file")
+ val file2 = new File(subdir2, "file")
+ Files.write("old", file1, StandardCharsets.UTF_8)
+ Files.write("new", file2, StandardCharsets.UTF_8)
+ sc = new SparkContext("local-cluster[1,1,1024]", "test")
+ sc.addFile(file1.getAbsolutePath)
+ def getAddedFileContents(): String = {
+ sc.parallelize(Seq(0)).map { _ =>
+ scala.io.Source.fromFile(SparkFiles.get("file")).mkString
+ }.first()
+ }
+ assert(getAddedFileContents() === "old")
+ intercept[IllegalArgumentException] {
+ sc.addFile(file2.getAbsolutePath)
+ }
+ assert(getAddedFileContents() === "old")
+ } finally {
+ Utils.deleteRecursively(dir)
+ }
+ }
+
+ // Regression tests for SPARK-16787
+ for (
+ schedulingMode <- Seq("local-mode", "non-local-mode");
+ method <- Seq("addJar", "addFile")
+ ) {
+ val jarPath = Thread.currentThread().getContextClassLoader.getResource("TestUDTF.jar").toString
+ val master = schedulingMode match {
+ case "local-mode" => "local"
+ case "non-local-mode" => "local-cluster[1,1,1024]"
+ }
+ test(s"$method can be called twice with same file in $schedulingMode (SPARK-16787)") {
+ sc = new SparkContext(master, "test")
+ method match {
+ case "addJar" =>
+ sc.addJar(jarPath)
+ sc.addJar(jarPath)
+ case "addFile" =>
+ sc.addFile(jarPath)
+ sc.addFile(jarPath)
+ }
+ }
+ }
+
test("Cancelling job group should not cause SparkContext to shutdown (SPARK-6414)") {
try {
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))