aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/FileServerSuite.scala16
2 files changed, 21 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index c9bc01cba5..158197ae4d 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -683,7 +683,7 @@ class SparkContext(
/**
* Adds a JAR dependency for all tasks to be executed on this SparkContext in the future.
* The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported
- * filesystems), or an HTTP, HTTPS or FTP URI.
+ * filesystems), an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node.
*/
def addJar(path: String) {
if (path == null) {
@@ -696,6 +696,7 @@ class SparkContext(
} else {
val uri = new URI(path)
key = uri.getScheme match {
+ // A JAR file which exists only on the driver node
case null | "file" =>
if (env.hadoop.isYarnMode()) {
// In order for this to work on yarn the user must specify the --addjars option to
@@ -713,6 +714,9 @@ class SparkContext(
} else {
env.httpFileServer.addJar(new File(uri.getPath))
}
+ // A JAR file which exists locally on every worker node
+ case "local" =>
+ "file:" + uri.getPath
case _ =>
path
}
diff --git a/core/src/test/scala/org/apache/spark/FileServerSuite.scala b/core/src/test/scala/org/apache/spark/FileServerSuite.scala
index 35d1d41af1..c210dd5c3b 100644
--- a/core/src/test/scala/org/apache/spark/FileServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileServerSuite.scala
@@ -120,4 +120,20 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
}.collect()
assert(result.toSet === Set((1,2), (2,7), (3,121)))
}
+
+ test ("Dynamically adding JARS on a standalone cluster using local: URL") {
+ sc = new SparkContext("local-cluster[1,1,512]", "test")
+ val sampleJarFile = getClass.getClassLoader.getResource("uncommons-maths-1.2.2.jar").getFile()
+ sc.addJar(sampleJarFile.replace("file", "local"))
+ val testData = Array((1,1), (1,1), (2,1), (3,5), (2,3), (3,0))
+ val result = sc.parallelize(testData).reduceByKey { (x,y) =>
+ val fac = Thread.currentThread.getContextClassLoader()
+ .loadClass("org.uncommons.maths.Maths")
+ .getDeclaredMethod("factorial", classOf[Int])
+ val a = fac.invoke(null, x.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt
+ val b = fac.invoke(null, y.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt
+ a + b
+ }.collect()
+ assert(result.toSet === Set((1,2), (2,7), (3,121)))
+ }
}