aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala
diff options
context:
space:
mode:
authorDenny <dennybritz@gmail.com>2012-09-04 18:52:07 -0700
committerDenny <dennybritz@gmail.com>2012-09-10 12:49:09 -0700
commitb864c36a3098e0ad8a2e508c94877bb2f4f4205d (patch)
treea36902a063b0346790ebcd8b282661166af5e02d /core/src/test/scala
parentf275fb07da33cfa38fc02ed121a52caef20f61d0 (diff)
downloadspark-b864c36a3098e0ad8a2e508c94877bb2f4f4205d.tar.gz
spark-b864c36a3098e0ad8a2e508c94877bb2f4f4205d.tar.bz2
spark-b864c36a3098e0ad8a2e508c94877bb2f4f4205d.zip
Dynamically adding jar files and caching fileSets.
Diffstat (limited to 'core/src/test/scala')
-rw-r--r--core/src/test/scala/spark/FileServerSuite.scala39
1 files changed, 30 insertions, 9 deletions
diff --git a/core/src/test/scala/spark/FileServerSuite.scala b/core/src/test/scala/spark/FileServerSuite.scala
index 883149feca..05517e8be4 100644
--- a/core/src/test/scala/spark/FileServerSuite.scala
+++ b/core/src/test/scala/spark/FileServerSuite.scala
@@ -1,16 +1,23 @@
package spark
+import com.google.common.io.Files
import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfter
import java.io.{File, PrintWriter}
+import SparkContext._
class FileServerSuite extends FunSuite with BeforeAndAfter {
var sc: SparkContext = _
+ var tmpFile : File = _
+ var testJarFile : File = _
before {
// Create a sample text file
- val pw = new PrintWriter(System.getProperty("java.io.tmpdir") + "FileServerSuite.txt")
+ val tmpdir = new File(Files.createTempDir(), "test")
+ tmpdir.mkdir()
+ tmpFile = new File(tmpdir, "FileServerSuite.txt")
+ val pw = new PrintWriter(tmpFile)
pw.println("100")
pw.close()
}
@@ -21,7 +28,6 @@ class FileServerSuite extends FunSuite with BeforeAndAfter {
sc = null
}
// Clean up downloaded file
- val tmpFile = new File("FileServerSuite.txt")
if (tmpFile.exists) {
tmpFile.delete()
}
@@ -29,15 +35,30 @@ class FileServerSuite extends FunSuite with BeforeAndAfter {
test("Distributing files") {
sc = new SparkContext("local[4]", "test")
- sc.addFile(System.getProperty("java.io.tmpdir") + "FileServerSuite.txt")
- val testRdd = sc.parallelize(List(1,2,3,4))
- val result = testRdd.map { x =>
- val in = new java.io.BufferedReader(new java.io.FileReader("FileServerSuite.txt"))
+ sc.addFile(tmpFile.toString)
+ val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0))
+ val result = sc.parallelize(testData).reduceByKey {
+ val in = new java.io.BufferedReader(new java.io.FileReader(tmpFile))
val fileVal = in.readLine().toInt
in.close()
- fileVal
- }.reduce(_ + _)
- assert(result == 400)
+ _ * fileVal + _ * fileVal
+ }.collect
+ println(result)
+ assert(result.toSet === Set((1,200), (2,300), (3,500)))
+ }
+
+ test ("Dynamically adding JARS") {
+ sc = new SparkContext("local[4]", "test")
+ val sampleJarFile = getClass().getClassLoader().getResource("uncommons-maths-1.2.2.jar").getFile()
+ sc.addJar(sampleJarFile)
+ val testData = Array((1,1), (1,1), (2,1), (3,5), (2,3), (3,0))
+ val result = sc.parallelize(testData).reduceByKey { (x,y) =>
+ val fac = Thread.currentThread.getContextClassLoader().loadClass("org.uncommons.maths.Maths").getDeclaredMethod("factorial", classOf[Int])
+ val a = fac.invoke(null, x.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt
+ val b = fac.invoke(null, y.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt
+ a + b
+ }.collect()
+ assert(result.toSet === Set((1,2), (2,7), (3,121)))
}
} \ No newline at end of file