diff options
author | Takuya UESHIN <ueshin@happy-camper.st> | 2014-03-27 22:17:15 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-03-27 22:17:15 -0700 |
commit | 3d89043b7ed13bc1bb703f6eb7c00e46b936de1e (patch) | |
tree | ff79c56a407eb6c0f849cfbe5f4b246ffe98f279 /repl/src/test/scala | |
parent | 6f986f0b87bd03f4df2bf6c917e61241e9b14ac2 (diff) | |
download | spark-3d89043b7ed13bc1bb703f6eb7c00e46b936de1e.tar.gz spark-3d89043b7ed13bc1bb703f6eb7c00e46b936de1e.tar.bz2 spark-3d89043b7ed13bc1bb703f6eb7c00e46b936de1e.zip |
[SPARK-1210] Prevent ContextClassLoader of Actor from becoming ClassLoader of Executo...
...r.
Constructor of `org.apache.spark.executor.Executor` should not set context class loader of current thread, which is backend Actor's thread.
Run the following code in local-mode REPL.
```
scala> case class Foo(i: Int)
scala> val ret = sc.parallelize((1 to 100).map(Foo), 10).collect
```
This causes errors as follows:
```
ERROR actor.OneForOneStrategy: [L$line5.$read$$iwC$$iwC$$iwC$$iwC$Foo;
java.lang.ArrayStoreException: [L$line5.$read$$iwC$$iwC$$iwC$$iwC$Foo;
at scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:88)
at org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:870)
at org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:870)
at org.apache.spark.scheduler.JobWaiter.taskSucceeded(JobWaiter.scala:56)
at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:859)
at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:616)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
```
This is because the class loaders to deserialize result `Foo` instances might be different from backend Actor's, and the Actor's class loader should be the same as Driver's.
Author: Takuya UESHIN <ueshin@happy-camper.st>
Closes #15 from ueshin/wip/wrongcontextclassloader and squashes the following commits:
d79e8c0 [Takuya UESHIN] Change a parent class loader of ExecutorURLClassLoader.
c6c09b6 [Takuya UESHIN] Add a test to collect objects of class defined in repl.
43e0feb [Takuya UESHIN] Prevent ContextClassLoader of Actor from becoming ClassLoader of Executor.
Diffstat (limited to 'repl/src/test/scala')
-rw-r--r-- | repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala | 11 |
1 files changed, 11 insertions, 0 deletions
diff --git a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala index 8203b8f612..4155007c6d 100644 --- a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala +++ b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala @@ -242,4 +242,15 @@ class ReplSuite extends FunSuite { assertContains("res4: Array[Int] = Array(0, 0, 0, 0, 0)", output) } } + + test("collecting objects of class defined in repl") { + val output = runInterpreter("local[2]", + """ + |case class Foo(i: Int) + |val ret = sc.parallelize((1 to 100).map(Foo), 10).collect + """.stripMargin) + assertDoesNotContain("error:", output) + assertDoesNotContain("Exception", output) + assertContains("ret: Array[Foo] = Array(Foo(1),", output) + } } |