diff options
-rw-r--r-- | core/src/test/scala/org/apache/spark/FileServerSuite.scala | 94 |
1 files changed, 50 insertions, 44 deletions
diff --git a/core/src/test/scala/org/apache/spark/FileServerSuite.scala b/core/src/test/scala/org/apache/spark/FileServerSuite.scala index 063b5fbab4..a15c3751c2 100644 --- a/core/src/test/scala/org/apache/spark/FileServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileServerSuite.scala @@ -17,15 +17,46 @@ package org.apache.spark +import java.io._ +import java.util.jar.{JarEntry, JarOutputStream} + +import SparkContext._ import com.google.common.io.Files import org.scalatest.FunSuite -import java.io.{File, PrintWriter, FileReader, BufferedReader} -import SparkContext._ class FileServerSuite extends FunSuite with LocalSparkContext { @transient var tmpFile: File = _ - @transient var testJarFile: File = _ + @transient var testJarFile: String = _ + + + override def beforeAll() { + super.beforeAll() + val buffer = new Array[Byte](10240) + val tmpdir = new File(Files.createTempDir(), "test") + tmpdir.mkdir() + val tmpJarEntry = new File(tmpdir, "FileServerSuite2.txt") + val pw = new PrintWriter(tmpJarEntry) + pw.println("test String in the file named FileServerSuite2.txt") + pw.close() + // The ugliest code possible, was translated from java. + val tmpFile2 = new File(tmpdir, "test.jar") + val stream = new FileOutputStream(tmpFile2) + val jar = new JarOutputStream(stream, new java.util.jar.Manifest()) + val jarAdd = new JarEntry(tmpJarEntry.getName) + jarAdd.setTime(tmpJarEntry.lastModified) + jar.putNextEntry(jarAdd) + val in = new FileInputStream(tmpJarEntry) + var nRead = 0 + while (nRead <= 0) { + nRead = in.read(buffer, 0, buffer.length) + jar.write(buffer, 0, nRead) + } + in.close() + jar.close() + stream.close() + testJarFile = tmpFile2.getAbsolutePath + } override def beforeEach() { super.beforeEach() @@ -75,20 +106,15 @@ class FileServerSuite extends FunSuite with LocalSparkContext { assert(result.toSet === Set((1,200), (2,300), (3,500))) } - ignore ("Dynamically adding JARS locally") { + test ("Dynamically adding JARS locally") { sc = new SparkContext("local[4]", "test") - val sampleJarFile = getClass.getClassLoader.getResource("uncommons-maths-1.2.2.jar").getFile() - sc.addJar(sampleJarFile) - val testData = Array((1,1), (1,1), (2,1), (3,5), (2,3), (3,0)) - val result = sc.parallelize(testData).reduceByKey { (x,y) => - val fac = Thread.currentThread.getContextClassLoader() - .loadClass("org.uncommons.maths.Maths") - .getDeclaredMethod("factorial", classOf[Int]) - val a = fac.invoke(null, x.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt - val b = fac.invoke(null, y.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt - a + b - }.collect() - assert(result.toSet === Set((1,2), (2,7), (3,121))) + sc.addJar(testJarFile) + val testData = Array((1, 1)) + sc.parallelize(testData).foreach { (x) => + if (Thread.currentThread.getContextClassLoader.getResource("FileServerSuite2.txt") == null) { + throw new SparkException("jar not added") + } + } } test("Distributing files on a standalone cluster") { @@ -105,35 +131,15 @@ class FileServerSuite extends FunSuite with LocalSparkContext { assert(result.toSet === Set((1,200), (2,300), (3,500))) } - ignore ("Dynamically adding JARS on a standalone cluster") { + test ("Dynamically adding JARS on a standalone cluster") { sc = new SparkContext("local-cluster[1,1,512]", "test") - val sampleJarFile = getClass.getClassLoader.getResource("uncommons-maths-1.2.2.jar").getFile() - sc.addJar(sampleJarFile) - val testData = Array((1,1), (1,1), (2,1), (3,5), (2,3), (3,0)) - val result = sc.parallelize(testData).reduceByKey { (x,y) => - val fac = Thread.currentThread.getContextClassLoader() - .loadClass("org.uncommons.maths.Maths") - .getDeclaredMethod("factorial", classOf[Int]) - val a = fac.invoke(null, x.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt - val b = fac.invoke(null, y.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt - a + b - }.collect() - assert(result.toSet === Set((1,2), (2,7), (3,121))) + sc.addJar(testJarFile) + val testData = Array((1,1)) + sc.parallelize(testData).foreach { (x) => + if (Thread.currentThread.getContextClassLoader.getResource("FileServerSuite2.txt") == null) { + throw new SparkException("jar not added") + } + } } - ignore ("Dynamically adding JARS on a standalone cluster using local: URL") { - sc = new SparkContext("local-cluster[1,1,512]", "test") - val sampleJarFile = getClass.getClassLoader.getResource("uncommons-maths-1.2.2.jar").getFile() - sc.addJar(sampleJarFile.replace("file", "local")) - val testData = Array((1,1), (1,1), (2,1), (3,5), (2,3), (3,0)) - val result = sc.parallelize(testData).reduceByKey { (x,y) => - val fac = Thread.currentThread.getContextClassLoader() - .loadClass("org.uncommons.maths.Maths") - .getDeclaredMethod("factorial", classOf[Int]) - val a = fac.invoke(null, x.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt - val b = fac.invoke(null, y.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt - a + b - }.collect() - assert(result.toSet === Set((1,2), (2,7), (3,121))) - } } |