diff options
author | Min Zhou <coderplay@gmail.com> | 2015-07-10 09:52:40 -0700 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2015-07-10 09:52:40 -0700 |
commit | c185f3a45ddbc073192f7da41303941ee4cebd4f (patch) | |
tree | 27e289012a1d800348f98c08e834cec6f249a78b | |
parent | db6d57f87a9d657d764e01879014bf406971855a (diff) | |
download | spark-c185f3a45ddbc073192f7da41303941ee4cebd4f.tar.gz spark-c185f3a45ddbc073192f7da41303941ee4cebd4f.tar.bz2 spark-c185f3a45ddbc073192f7da41303941ee4cebd4f.zip |
[SPARK-8675] Executors created by LocalBackend won't get the same classpath as other executor backends
AFAIK, some spark application always use LocalBackend to do some local initiatives, spark sql is an example. Starting a LocalPoint won't add user classpath into executor.
```java
override def start() {
localEndpoint = SparkEnv.get.rpcEnv.setupEndpoint(
"LocalBackendEndpoint", new LocalEndpoint(SparkEnv.get.rpcEnv, scheduler, this, totalCores))
}
```
Thus will cause local executor fail with these scenarios, loading hadoop built-in native libraries, loading other user defined native libraries, loading user jars, reading s3 config from a site.xml file, etc
Author: Min Zhou <coderplay@gmail.com>
Closes #7091 from coderplay/master and squashes the following commits:
365838f [Min Zhou] Fixed java.net.MalformedURLException, add default scheme, support relative path
d215b7f [Min Zhou] Follows spark standard scala style, make the auto testing happy
84ad2cd [Min Zhou] Use system specific path separator instead of ','
01f5d1a [Min Zhou] Merge branch 'master' of https://github.com/apache/spark
e528be7 [Min Zhou] Merge branch 'master' of https://github.com/apache/spark
45bf62c [Min Zhou] SPARK-8675 Executors created by LocalBackend won't get the same classpath as other executor backends
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala | 19 |
1 files changed, 17 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala index 3078a1b10b..776e5d330e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala @@ -17,6 +17,8 @@ package org.apache.spark.scheduler.local +import java.io.File +import java.net.URL import java.nio.ByteBuffer import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv, TaskState} @@ -40,6 +42,7 @@ private case class StopExecutor() */ private[spark] class LocalEndpoint( override val rpcEnv: RpcEnv, + userClassPath: Seq[URL], scheduler: TaskSchedulerImpl, executorBackend: LocalBackend, private val totalCores: Int) @@ -51,7 +54,7 @@ private[spark] class LocalEndpoint( private val localExecutorHostname = "localhost" private val executor = new Executor( - localExecutorId, localExecutorHostname, SparkEnv.get, isLocal = true) + localExecutorId, localExecutorHostname, SparkEnv.get, userClassPath, isLocal = true) override def receive: PartialFunction[Any, Unit] = { case ReviveOffers => @@ -97,10 +100,22 @@ private[spark] class LocalBackend( private val appId = "local-" + System.currentTimeMillis var localEndpoint: RpcEndpointRef = null + private val userClassPath = getUserClasspath(conf) + + /** + * Returns a list of URLs representing the user classpath. + * + * @param conf Spark configuration. + */ + def getUserClasspath(conf: SparkConf): Seq[URL] = { + val userClassPathStr = conf.getOption("spark.executor.extraClassPath") + userClassPathStr.map(_.split(File.pathSeparator)).toSeq.flatten.map(new File(_).toURI.toURL) + } override def start() { localEndpoint = SparkEnv.get.rpcEnv.setupEndpoint( - "LocalBackendEndpoint", new LocalEndpoint(SparkEnv.get.rpcEnv, scheduler, this, totalCores)) + "LocalBackendEndpoint", + new LocalEndpoint(SparkEnv.get.rpcEnv, userClassPath, scheduler, this, totalCores)) } override def stop() { |