aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTakuya UESHIN <ueshin@happy-camper.st>2014-03-27 22:17:15 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-03-27 22:17:15 -0700
commit3d89043b7ed13bc1bb703f6eb7c00e46b936de1e (patch)
treeff79c56a407eb6c0f849cfbe5f4b246ffe98f279
parent6f986f0b87bd03f4df2bf6c917e61241e9b14ac2 (diff)
downloadspark-3d89043b7ed13bc1bb703f6eb7c00e46b936de1e.tar.gz
spark-3d89043b7ed13bc1bb703f6eb7c00e46b936de1e.tar.bz2
spark-3d89043b7ed13bc1bb703f6eb7c00e46b936de1e.zip
[SPARK-1210] Prevent ContextClassLoader of Actor from becoming ClassLoader of Executo...
...r. Constructor of `org.apache.spark.executor.Executor` should not set context class loader of current thread, which is backend Actor's thread. Run the following code in local-mode REPL. ``` scala> case class Foo(i: Int) scala> val ret = sc.parallelize((1 to 100).map(Foo), 10).collect ``` This causes errors as follows: ``` ERROR actor.OneForOneStrategy: [L$line5.$read$$iwC$$iwC$$iwC$$iwC$Foo; java.lang.ArrayStoreException: [L$line5.$read$$iwC$$iwC$$iwC$$iwC$Foo; at scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:88) at org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:870) at org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:870) at org.apache.spark.scheduler.JobWaiter.taskSucceeded(JobWaiter.scala:56) at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:859) at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:616) at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ``` This is because the class loaders to deserialize result `Foo` instances might be different from backend Actor's, and the Actor's class loader should be the same as Driver's. Author: Takuya UESHIN <ueshin@happy-camper.st> Closes #15 from ueshin/wip/wrongcontextclassloader and squashes the following commits: d79e8c0 [Takuya UESHIN] Change a parent class loader of ExecutorURLClassLoader. c6c09b6 [Takuya UESHIN] Add a test to collect objects of class defined in repl. 43e0feb [Takuya UESHIN] Prevent ContextClassLoader of Actor from becoming ClassLoader of Executor.
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala5
-rw-r--r--repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala11
2 files changed, 13 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 8fe9b848ba..13e2e29242 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -112,11 +112,10 @@ private[spark] class Executor(
}
}
- // Create our ClassLoader and set it on this thread
+ // Create our ClassLoader
// do this after SparkEnv creation so can access the SecurityManager
private val urlClassLoader = createClassLoader()
private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader)
- Thread.currentThread.setContextClassLoader(replClassLoader)
// Akka's message frame size. If task result is bigger than this, we use the block manager
// to send the result back.
@@ -294,7 +293,7 @@ private[spark] class Executor(
* created by the interpreter to the search path
*/
private def createClassLoader(): ExecutorURLClassLoader = {
- val loader = this.getClass.getClassLoader
+ val loader = Thread.currentThread().getContextClassLoader
// For each of the jars in the jarSet, add them to the class loader.
// We assume each of the files has already been fetched.
diff --git a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
index 8203b8f612..4155007c6d 100644
--- a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
+++ b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -242,4 +242,15 @@ class ReplSuite extends FunSuite {
assertContains("res4: Array[Int] = Array(0, 0, 0, 0, 0)", output)
}
}
+
+ test("collecting objects of class defined in repl") {
+ val output = runInterpreter("local[2]",
+ """
+ |case class Foo(i: Int)
+ |val ret = sc.parallelize((1 to 100).map(Foo), 10).collect
+ """.stripMargin)
+ assertDoesNotContain("error:", output)
+ assertDoesNotContain("Exception", output)
+ assertContains("ret: Array[Foo] = Array(Foo(1),", output)
+ }
}