aboutsummaryrefslogtreecommitdiff
path: root/repl/src/test
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2015-12-10 13:26:30 -0800
committerMarcelo Vanzin <vanzin@cloudera.com>2015-12-10 13:26:30 -0800
commit4a46b8859d3314b5b45a67cdc5c81fecb6e9e78c (patch)
treee675378fe850f9fbcf2cafea7cae589876f147a8 /repl/src/test
parent2ecbe02d5b28ee562d10c1735244b90a08532c9e (diff)
downloadspark-4a46b8859d3314b5b45a67cdc5c81fecb6e9e78c.tar.gz
spark-4a46b8859d3314b5b45a67cdc5c81fecb6e9e78c.tar.bz2
spark-4a46b8859d3314b5b45a67cdc5c81fecb6e9e78c.zip
[SPARK-11563][CORE][REPL] Use RpcEnv to transfer REPL-generated classes.
This avoids bringing up yet another HTTP server on the driver, and instead reuses the file server already managed by the driver's RpcEnv. As a bonus, the repl now inherits the security features of the network library. There's also a small change to create the directory for storing classes under the root temp dir for the application (instead of directly under java.io.tmpdir). Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #9923 from vanzin/SPARK-11563.
Diffstat (limited to 'repl/src/test')
-rw-r--r--repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala48
1 files changed, 38 insertions, 10 deletions
diff --git a/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala b/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala
index c1211f7596..1360f09e7f 100644
--- a/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala
+++ b/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala
@@ -18,24 +18,29 @@
package org.apache.spark.repl
import java.io.File
-import java.net.{URL, URLClassLoader}
+import java.net.{URI, URL, URLClassLoader}
+import java.nio.channels.{FileChannel, ReadableByteChannel}
import java.nio.charset.StandardCharsets
+import java.nio.file.{Paths, StandardOpenOption}
import java.util
-import com.google.common.io.Files
-
import scala.concurrent.duration._
import scala.io.Source
import scala.language.implicitConversions
import scala.language.postfixOps
+import com.google.common.io.Files
import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.Interruptor
import org.scalatest.concurrent.Timeouts._
import org.scalatest.mock.MockitoSugar
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+import org.mockito.Matchers.anyString
import org.mockito.Mockito._
import org.apache.spark._
+import org.apache.spark.rpc.RpcEnv
import org.apache.spark.util.Utils
class ExecutorClassLoaderSuite
@@ -78,7 +83,7 @@ class ExecutorClassLoaderSuite
test("child first") {
val parentLoader = new URLClassLoader(urls2, null)
- val classLoader = new ExecutorClassLoader(new SparkConf(), url1, parentLoader, true)
+ val classLoader = new ExecutorClassLoader(new SparkConf(), null, url1, parentLoader, true)
val fakeClass = classLoader.loadClass("ReplFakeClass2").newInstance()
val fakeClassVersion = fakeClass.toString
assert(fakeClassVersion === "1")
@@ -86,7 +91,7 @@ class ExecutorClassLoaderSuite
test("parent first") {
val parentLoader = new URLClassLoader(urls2, null)
- val classLoader = new ExecutorClassLoader(new SparkConf(), url1, parentLoader, false)
+ val classLoader = new ExecutorClassLoader(new SparkConf(), null, url1, parentLoader, false)
val fakeClass = classLoader.loadClass("ReplFakeClass1").newInstance()
val fakeClassVersion = fakeClass.toString
assert(fakeClassVersion === "2")
@@ -94,7 +99,7 @@ class ExecutorClassLoaderSuite
test("child first can fall back") {
val parentLoader = new URLClassLoader(urls2, null)
- val classLoader = new ExecutorClassLoader(new SparkConf(), url1, parentLoader, true)
+ val classLoader = new ExecutorClassLoader(new SparkConf(), null, url1, parentLoader, true)
val fakeClass = classLoader.loadClass("ReplFakeClass3").newInstance()
val fakeClassVersion = fakeClass.toString
assert(fakeClassVersion === "2")
@@ -102,7 +107,7 @@ class ExecutorClassLoaderSuite
test("child first can fail") {
val parentLoader = new URLClassLoader(urls2, null)
- val classLoader = new ExecutorClassLoader(new SparkConf(), url1, parentLoader, true)
+ val classLoader = new ExecutorClassLoader(new SparkConf(), null, url1, parentLoader, true)
intercept[java.lang.ClassNotFoundException] {
classLoader.loadClass("ReplFakeClassDoesNotExist").newInstance()
}
@@ -110,7 +115,7 @@ class ExecutorClassLoaderSuite
test("resource from parent") {
val parentLoader = new URLClassLoader(urls2, null)
- val classLoader = new ExecutorClassLoader(new SparkConf(), url1, parentLoader, true)
+ val classLoader = new ExecutorClassLoader(new SparkConf(), null, url1, parentLoader, true)
val resourceName: String = parentResourceNames.head
val is = classLoader.getResourceAsStream(resourceName)
assert(is != null, s"Resource $resourceName not found")
@@ -120,7 +125,7 @@ class ExecutorClassLoaderSuite
test("resources from parent") {
val parentLoader = new URLClassLoader(urls2, null)
- val classLoader = new ExecutorClassLoader(new SparkConf(), url1, parentLoader, true)
+ val classLoader = new ExecutorClassLoader(new SparkConf(), null, url1, parentLoader, true)
val resourceName: String = parentResourceNames.head
val resources: util.Enumeration[URL] = classLoader.getResources(resourceName)
assert(resources.hasMoreElements, s"Resource $resourceName not found")
@@ -142,7 +147,7 @@ class ExecutorClassLoaderSuite
SparkEnv.set(mockEnv)
// Create an ExecutorClassLoader that's configured to load classes from the HTTP server
val parentLoader = new URLClassLoader(Array.empty, null)
- val classLoader = new ExecutorClassLoader(conf, classServer.uri, parentLoader, false)
+ val classLoader = new ExecutorClassLoader(conf, null, classServer.uri, parentLoader, false)
classLoader.httpUrlConnectionTimeoutMillis = 500
// Check that this class loader can actually load classes that exist
val fakeClass = classLoader.loadClass("ReplFakeClass2").newInstance()
@@ -177,4 +182,27 @@ class ExecutorClassLoaderSuite
failAfter(10 seconds)(tryAndFailToLoadABunchOfClasses())(interruptor)
}
+ test("fetch classes using Spark's RpcEnv") {
+ val env = mock[SparkEnv]
+ val rpcEnv = mock[RpcEnv]
+ when(env.rpcEnv).thenReturn(rpcEnv)
+ when(rpcEnv.openChannel(anyString())).thenAnswer(new Answer[ReadableByteChannel]() {
+ override def answer(invocation: InvocationOnMock): ReadableByteChannel = {
+ val uri = new URI(invocation.getArguments()(0).asInstanceOf[String])
+ val path = Paths.get(tempDir1.getAbsolutePath(), uri.getPath().stripPrefix("/"))
+ FileChannel.open(path, StandardOpenOption.READ)
+ }
+ })
+
+ val classLoader = new ExecutorClassLoader(new SparkConf(), env, "spark://localhost:1234",
+ getClass().getClassLoader(), false)
+
+ val fakeClass = classLoader.loadClass("ReplFakeClass2").newInstance()
+ val fakeClassVersion = fakeClass.toString
+ assert(fakeClassVersion === "1")
+ intercept[java.lang.ClassNotFoundException] {
+ classLoader.loadClass("ReplFakeClassDoesNotExist").newInstance()
+ }
+ }
+
}