aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDenny <dennybritz@gmail.com>2012-09-10 15:39:58 -0700
committerDenny <dennybritz@gmail.com>2012-09-10 15:39:58 -0700
commit4d3471dd077e9e9c9038707eb5ba3fb8539c05e0 (patch)
tree78b7593b782090706c81061c317f8cbaeb536c94
parent94a7e82ba12b8cb241415b50d9944f20750d1ffe (diff)
downloadspark-4d3471dd077e9e9c9038707eb5ba3fb8539c05e0.tar.gz
spark-4d3471dd077e9e9c9038707eb5ba3fb8539c05e0.tar.bz2
spark-4d3471dd077e9e9c9038707eb5ba3fb8539c05e0.zip
Fix serialization bugs and added local cluster tests
-rw-r--r--core/src/main/scala/spark/SparkContext.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/ShuffleMapTask.scala2
-rw-r--r--core/src/test/scala/spark/FileServerSuite.scala33
3 files changed, 33 insertions, 4 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 7a1bf692e4..2bd07f10d4 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -330,7 +330,7 @@ class SparkContext(
// Fetch the file locally in case the task is executed locally
val filename = new File(path.split("/").last)
- Utils.fetchFile(path, new File(""))
+ Utils.fetchFile(path, new File("."))
logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key))
}
diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
index 3687bb990c..a551bcc782 100644
--- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
@@ -141,7 +141,7 @@ class ShuffleMapTask(
val jarSetNumBytes = in.readInt()
val jarSetBytes = new Array[Byte](jarSetNumBytes)
in.readFully(jarSetBytes)
- fileSet = ShuffleMapTask.deserializeFileSet(jarSetBytes)
+ jarSet = ShuffleMapTask.deserializeFileSet(jarSetBytes)
partition = in.readInt()
generation = in.readLong()
diff --git a/core/src/test/scala/spark/FileServerSuite.scala b/core/src/test/scala/spark/FileServerSuite.scala
index 05517e8be4..500af1eb90 100644
--- a/core/src/test/scala/spark/FileServerSuite.scala
+++ b/core/src/test/scala/spark/FileServerSuite.scala
@@ -33,7 +33,7 @@ class FileServerSuite extends FunSuite with BeforeAndAfter {
}
}
- test("Distributing files") {
+ test("Distributing files locally") {
sc = new SparkContext("local[4]", "test")
sc.addFile(tmpFile.toString)
val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0))
@@ -47,7 +47,7 @@ class FileServerSuite extends FunSuite with BeforeAndAfter {
assert(result.toSet === Set((1,200), (2,300), (3,500)))
}
- test ("Dynamically adding JARS") {
+ test ("Dynamically adding JARS locally") {
sc = new SparkContext("local[4]", "test")
val sampleJarFile = getClass().getClassLoader().getResource("uncommons-maths-1.2.2.jar").getFile()
sc.addJar(sampleJarFile)
@@ -60,5 +60,34 @@ class FileServerSuite extends FunSuite with BeforeAndAfter {
}.collect()
assert(result.toSet === Set((1,2), (2,7), (3,121)))
}
+
+ test("Distributing files on a standalone cluster") {
+ sc = new SparkContext("local-cluster[1,1,512]", "test")
+ sc.addFile(tmpFile.toString)
+ val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0))
+ val result = sc.parallelize(testData).reduceByKey {
+ val in = new java.io.BufferedReader(new java.io.FileReader(tmpFile))
+ val fileVal = in.readLine().toInt
+ in.close()
+ _ * fileVal + _ * fileVal
+ }.collect
+ println(result)
+ assert(result.toSet === Set((1,200), (2,300), (3,500)))
+ }
+
+
+ test ("Dynamically adding JARS on a standalone cluster") {
+ sc = new SparkContext("local-cluster[1,1,512]", "test")
+ val sampleJarFile = getClass().getClassLoader().getResource("uncommons-maths-1.2.2.jar").getFile()
+ sc.addJar(sampleJarFile)
+ val testData = Array((1,1), (1,1), (2,1), (3,5), (2,3), (3,0))
+ val result = sc.parallelize(testData).reduceByKey { (x,y) =>
+ val fac = Thread.currentThread.getContextClassLoader().loadClass("org.uncommons.maths.Maths").getDeclaredMethod("factorial", classOf[Int])
+ val a = fac.invoke(null, x.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt
+ val b = fac.invoke(null, y.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt
+ a + b
+ }.collect()
+ assert(result.toSet === Set((1,2), (2,7), (3,121)))
+ }
} \ No newline at end of file