aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMin Zhou <coderplay@gmail.com>2015-07-10 09:52:40 -0700
committerAndrew Or <andrew@databricks.com>2015-07-10 09:52:40 -0700
commitc185f3a45ddbc073192f7da41303941ee4cebd4f (patch)
tree27e289012a1d800348f98c08e834cec6f249a78b
parentdb6d57f87a9d657d764e01879014bf406971855a (diff)
downloadspark-c185f3a45ddbc073192f7da41303941ee4cebd4f.tar.gz
spark-c185f3a45ddbc073192f7da41303941ee4cebd4f.tar.bz2
spark-c185f3a45ddbc073192f7da41303941ee4cebd4f.zip
[SPARK-8675] Executors created by LocalBackend won't get the same classpath as other executor backends
AFAIK, some spark application always use LocalBackend to do some local initiatives, spark sql is an example. Starting a LocalPoint won't add user classpath into executor. ```java override def start() { localEndpoint = SparkEnv.get.rpcEnv.setupEndpoint( "LocalBackendEndpoint", new LocalEndpoint(SparkEnv.get.rpcEnv, scheduler, this, totalCores)) } ``` Thus will cause local executor fail with these scenarios, loading hadoop built-in native libraries, loading other user defined native libraries, loading user jars, reading s3 config from a site.xml file, etc Author: Min Zhou <coderplay@gmail.com> Closes #7091 from coderplay/master and squashes the following commits: 365838f [Min Zhou] Fixed java.net.MalformedURLException, add default scheme, support relative path d215b7f [Min Zhou] Follows spark standard scala style, make the auto testing happy 84ad2cd [Min Zhou] Use system specific path separator instead of ',' 01f5d1a [Min Zhou] Merge branch 'master' of https://github.com/apache/spark e528be7 [Min Zhou] Merge branch 'master' of https://github.com/apache/spark 45bf62c [Min Zhou] SPARK-8675 Executors created by LocalBackend won't get the same classpath as other executor backends
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala19
1 files changed, 17 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
index 3078a1b10b..776e5d330e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
@@ -17,6 +17,8 @@
package org.apache.spark.scheduler.local
+import java.io.File
+import java.net.URL
import java.nio.ByteBuffer
import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv, TaskState}
@@ -40,6 +42,7 @@ private case class StopExecutor()
*/
private[spark] class LocalEndpoint(
override val rpcEnv: RpcEnv,
+ userClassPath: Seq[URL],
scheduler: TaskSchedulerImpl,
executorBackend: LocalBackend,
private val totalCores: Int)
@@ -51,7 +54,7 @@ private[spark] class LocalEndpoint(
private val localExecutorHostname = "localhost"
private val executor = new Executor(
- localExecutorId, localExecutorHostname, SparkEnv.get, isLocal = true)
+ localExecutorId, localExecutorHostname, SparkEnv.get, userClassPath, isLocal = true)
override def receive: PartialFunction[Any, Unit] = {
case ReviveOffers =>
@@ -97,10 +100,22 @@ private[spark] class LocalBackend(
private val appId = "local-" + System.currentTimeMillis
var localEndpoint: RpcEndpointRef = null
+ private val userClassPath = getUserClasspath(conf)
+
+ /**
+ * Returns a list of URLs representing the user classpath.
+ *
+ * @param conf Spark configuration.
+ */
+ def getUserClasspath(conf: SparkConf): Seq[URL] = {
+ val userClassPathStr = conf.getOption("spark.executor.extraClassPath")
+ userClassPathStr.map(_.split(File.pathSeparator)).toSeq.flatten.map(new File(_).toURI.toURL)
+ }
override def start() {
localEndpoint = SparkEnv.get.rpcEnv.setupEndpoint(
- "LocalBackendEndpoint", new LocalEndpoint(SparkEnv.get.rpcEnv, scheduler, this, totalCores))
+ "LocalBackendEndpoint",
+ new LocalEndpoint(SparkEnv.get.rpcEnv, userClassPath, scheduler, this, totalCores))
}
override def stop() {