From de0285556ac3fa068e9b33d6db14af2168504c1f Mon Sep 17 00:00:00 2001 From: Evan Chan Date: Wed, 30 Oct 2013 09:41:35 -0700 Subject: Add support for local:// URI scheme for addJars() This indicates that a jar is available locally on each worker node. --- core/src/main/scala/org/apache/spark/SparkContext.scala | 6 +++++- .../test/scala/org/apache/spark/FileServerSuite.scala | 16 ++++++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index c9bc01cba5..158197ae4d 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -683,7 +683,7 @@ class SparkContext( /** * Adds a JAR dependency for all tasks to be executed on this SparkContext in the future. * The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported - * filesystems), or an HTTP, HTTPS or FTP URI. + * filesystems), an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node. */ def addJar(path: String) { if (path == null) { @@ -696,6 +696,7 @@ class SparkContext( } else { val uri = new URI(path) key = uri.getScheme match { + // A JAR file which exists only on the driver node case null | "file" => if (env.hadoop.isYarnMode()) { // In order for this to work on yarn the user must specify the --addjars option to @@ -713,6 +714,9 @@ class SparkContext( } else { env.httpFileServer.addJar(new File(uri.getPath)) } + // A JAR file which exists locally on every worker node + case "local" => + "file:" + uri.getPath case _ => path } diff --git a/core/src/test/scala/org/apache/spark/FileServerSuite.scala b/core/src/test/scala/org/apache/spark/FileServerSuite.scala index 35d1d41af1..c210dd5c3b 100644 --- a/core/src/test/scala/org/apache/spark/FileServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileServerSuite.scala @@ -120,4 +120,20 @@ class FileServerSuite extends FunSuite with LocalSparkContext { }.collect() assert(result.toSet === Set((1,2), (2,7), (3,121))) } + + test ("Dynamically adding JARS on a standalone cluster using local: URL") { + sc = new SparkContext("local-cluster[1,1,512]", "test") + val sampleJarFile = getClass.getClassLoader.getResource("uncommons-maths-1.2.2.jar").getFile() + sc.addJar(sampleJarFile.replace("file", "local")) + val testData = Array((1,1), (1,1), (2,1), (3,5), (2,3), (3,0)) + val result = sc.parallelize(testData).reduceByKey { (x,y) => + val fac = Thread.currentThread.getContextClassLoader() + .loadClass("org.uncommons.maths.Maths") + .getDeclaredMethod("factorial", classOf[Int]) + val a = fac.invoke(null, x.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt + val b = fac.invoke(null, y.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt + a + b + }.collect() + assert(result.toSet === Set((1,2), (2,7), (3,121))) + } } -- cgit v1.2.3