aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
authorHurshal Patel <hpatel516@gmail.com>2015-11-18 09:28:59 -0800
committerYin Huai <yhuai@databricks.com>2015-11-18 09:28:59 -0800
commit3cca5ffb3d60d5de9a54bc71cf0b8279898936d2 (patch)
treefe55b37acc27cdcccc4a0061e2787615f1842464 /core/src/test
parent224723e6a8b198ef45d6c5ca5d2f9c61188ada8f (diff)
downloadspark-3cca5ffb3d60d5de9a54bc71cf0b8279898936d2.tar.gz
spark-3cca5ffb3d60d5de9a54bc71cf0b8279898936d2.tar.bz2
spark-3cca5ffb3d60d5de9a54bc71cf0b8279898936d2.zip
[SPARK-11195][CORE] Use correct classloader for TaskResultGetter
Make sure we are using the context classloader when deserializing failed TaskResults instead of the Spark classloader. The issue is that `enqueueFailedTask` was using the incorrect classloader which results in `ClassNotFoundException`. Adds a test in TaskResultGetterSuite that compiles a custom exception, throws it on the executor, and asserts that Spark handles the TaskResult deserialization instead of returning `UnknownReason`. See #9367 for previous comments See SPARK-11195 for a full repro Author: Hurshal Patel <hpatel516@gmail.com> Closes #9779 from choochootrain/spark-11195-master.
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala65
1 files changed, 64 insertions, 1 deletions
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
index 815caa79ff..bc72c3685e 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.scheduler
+import java.io.File
+import java.net.URL
import java.nio.ByteBuffer
import scala.concurrent.duration._
@@ -26,8 +28,10 @@ import scala.util.control.NonFatal
import org.scalatest.BeforeAndAfter
import org.scalatest.concurrent.Eventually._
-import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkEnv, SparkFunSuite}
+import org.apache.spark._
import org.apache.spark.storage.TaskResultBlockId
+import org.apache.spark.TestUtils.JavaSourceFromString
+import org.apache.spark.util.{MutableURLClassLoader, Utils}
/**
* Removes the TaskResult from the BlockManager before delegating to a normal TaskResultGetter.
@@ -119,5 +123,64 @@ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with Local
// Make sure two tasks were run (one failed one, and a second retried one).
assert(scheduler.nextTaskId.get() === 2)
}
+
+ /**
+ * Make sure we are using the context classloader when deserializing failed TaskResults instead
+ * of the Spark classloader.
+
+ * This test compiles a jar containing an exception and tests that when it is thrown on the
+ * executor, enqueueFailedTask can correctly deserialize the failure and identify the thrown
+ * exception as the cause.
+
+ * Before this fix, enqueueFailedTask would throw a ClassNotFoundException when deserializing
+ * the exception, resulting in an UnknownReason for the TaskEndResult.
+ */
+ test("failed task deserialized with the correct classloader (SPARK-11195)") {
+ // compile a small jar containing an exception that will be thrown on an executor.
+ val tempDir = Utils.createTempDir()
+ val srcDir = new File(tempDir, "repro/")
+ srcDir.mkdirs()
+ val excSource = new JavaSourceFromString(new File(srcDir, "MyException").getAbsolutePath,
+ """package repro;
+ |
+ |public class MyException extends Exception {
+ |}
+ """.stripMargin)
+ val excFile = TestUtils.createCompiledClass("MyException", srcDir, excSource, Seq.empty)
+ val jarFile = new File(tempDir, "testJar-%s.jar".format(System.currentTimeMillis()))
+ TestUtils.createJar(Seq(excFile), jarFile, directoryPrefix = Some("repro"))
+
+ // ensure we reset the classloader after the test completes
+ val originalClassLoader = Thread.currentThread.getContextClassLoader
+ try {
+ // load the exception from the jar
+ val loader = new MutableURLClassLoader(new Array[URL](0), originalClassLoader)
+ loader.addURL(jarFile.toURI.toURL)
+ Thread.currentThread().setContextClassLoader(loader)
+ val excClass: Class[_] = Utils.classForName("repro.MyException")
+
+ // NOTE: we must run the cluster with "local" so that the executor can load the compiled
+ // jar.
+ sc = new SparkContext("local", "test", conf)
+ val rdd = sc.parallelize(Seq(1), 1).map { _ =>
+ val exc = excClass.newInstance().asInstanceOf[Exception]
+ throw exc
+ }
+
+ // the driver should not have any problems resolving the exception class and determining
+ // why the task failed.
+ val exceptionMessage = intercept[SparkException] {
+ rdd.collect()
+ }.getMessage
+
+ val expectedFailure = """(?s).*Lost task.*: repro.MyException.*""".r
+ val unknownFailure = """(?s).*Lost task.*: UnknownReason.*""".r
+
+ assert(expectedFailure.findFirstMatchIn(exceptionMessage).isDefined)
+ assert(unknownFailure.findFirstMatchIn(exceptionMessage).isEmpty)
+ } finally {
+ Thread.currentThread.setContextClassLoader(originalClassLoader)
+ }
+ }
}