aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-10-30 12:03:44 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-10-30 12:03:44 -0700
commit618c1f6cf3008caae7a8c0202721a6bd77d29a0f (patch)
tree58020a6617f7ee5c1b8c33d675ce89ebb6ead6a4 /core
parent745dc4290834e8c38d61600e23e4706b0ae47179 (diff)
parentde0285556ac3fa068e9b33d6db14af2168504c1f (diff)
downloadspark-618c1f6cf3008caae7a8c0202721a6bd77d29a0f.tar.gz
spark-618c1f6cf3008caae7a8c0202721a6bd77d29a0f.tar.bz2
spark-618c1f6cf3008caae7a8c0202721a6bd77d29a0f.zip
Merge pull request #125 from velvia/2013-10/local-jar-uri
Add support for local:// URI scheme for addJars() This PR adds support for a new URI scheme for SparkContext.addJars(): `local://file/path`. The *local* scheme indicates that the `/file/path` exists on every worker node. The reason for its existence is for big library JARs, which would be really expensive to serve using the standard HTTP fileserver distribution method, especially for big clusters. Today the only inexpensive method (assuming such a file is on every host, via say NFS, rsync, etc.) of doing this is to add the JAR to the SPARK_CLASSPATH, but we want a method where the user does not need to modify the Spark configuration. I would add something to the docs, but it's not obvious where to add it. Oh, and it would be great if this could be merged in time for 0.8.1.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/FileServerSuite.scala16
2 files changed, 21 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index c9bc01cba5..158197ae4d 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -683,7 +683,7 @@ class SparkContext(
/**
* Adds a JAR dependency for all tasks to be executed on this SparkContext in the future.
* The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported
- * filesystems), or an HTTP, HTTPS or FTP URI.
+ * filesystems), an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node.
*/
def addJar(path: String) {
if (path == null) {
@@ -696,6 +696,7 @@ class SparkContext(
} else {
val uri = new URI(path)
key = uri.getScheme match {
+ // A JAR file which exists only on the driver node
case null | "file" =>
if (env.hadoop.isYarnMode()) {
// In order for this to work on yarn the user must specify the --addjars option to
@@ -713,6 +714,9 @@ class SparkContext(
} else {
env.httpFileServer.addJar(new File(uri.getPath))
}
+ // A JAR file which exists locally on every worker node
+ case "local" =>
+ "file:" + uri.getPath
case _ =>
path
}
diff --git a/core/src/test/scala/org/apache/spark/FileServerSuite.scala b/core/src/test/scala/org/apache/spark/FileServerSuite.scala
index 35d1d41af1..c210dd5c3b 100644
--- a/core/src/test/scala/org/apache/spark/FileServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileServerSuite.scala
@@ -120,4 +120,20 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
}.collect()
assert(result.toSet === Set((1,2), (2,7), (3,121)))
}
+
+ test ("Dynamically adding JARS on a standalone cluster using local: URL") {
+ sc = new SparkContext("local-cluster[1,1,512]", "test")
+ val sampleJarFile = getClass.getClassLoader.getResource("uncommons-maths-1.2.2.jar").getFile()
+ sc.addJar(sampleJarFile.replace("file", "local"))
+ val testData = Array((1,1), (1,1), (2,1), (3,5), (2,3), (3,0))
+ val result = sc.parallelize(testData).reduceByKey { (x,y) =>
+ val fac = Thread.currentThread.getContextClassLoader()
+ .loadClass("org.uncommons.maths.Maths")
+ .getDeclaredMethod("factorial", classOf[Int])
+ val a = fac.invoke(null, x.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt
+ val b = fac.invoke(null, y.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt
+ a + b
+ }.collect()
+ assert(result.toSet === Set((1,2), (2,7), (3,121)))
+ }
}