aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala19
1 files changed, 17 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
index 3078a1b10b..776e5d330e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
@@ -17,6 +17,8 @@
package org.apache.spark.scheduler.local
+import java.io.File
+import java.net.URL
import java.nio.ByteBuffer
import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv, TaskState}
@@ -40,6 +42,7 @@ private case class StopExecutor()
*/
private[spark] class LocalEndpoint(
override val rpcEnv: RpcEnv,
+ userClassPath: Seq[URL],
scheduler: TaskSchedulerImpl,
executorBackend: LocalBackend,
private val totalCores: Int)
@@ -51,7 +54,7 @@ private[spark] class LocalEndpoint(
private val localExecutorHostname = "localhost"
private val executor = new Executor(
- localExecutorId, localExecutorHostname, SparkEnv.get, isLocal = true)
+ localExecutorId, localExecutorHostname, SparkEnv.get, userClassPath, isLocal = true)
override def receive: PartialFunction[Any, Unit] = {
case ReviveOffers =>
@@ -97,10 +100,22 @@ private[spark] class LocalBackend(
private val appId = "local-" + System.currentTimeMillis
var localEndpoint: RpcEndpointRef = null
+ private val userClassPath = getUserClasspath(conf)
+
+ /**
+ * Returns a list of URLs representing the user classpath.
+ *
+ * @param conf Spark configuration.
+ */
+ def getUserClasspath(conf: SparkConf): Seq[URL] = {
+ val userClassPathStr = conf.getOption("spark.executor.extraClassPath")
+ userClassPathStr.map(_.split(File.pathSeparator)).toSeq.flatten.map(new File(_).toURI.toURL)
+ }
override def start() {
localEndpoint = SparkEnv.get.rpcEnv.setupEndpoint(
- "LocalBackendEndpoint", new LocalEndpoint(SparkEnv.get.rpcEnv, scheduler, this, totalCores))
+ "LocalBackendEndpoint",
+ new LocalEndpoint(SparkEnv.get.rpcEnv, userClassPath, scheduler, this, totalCores))
}
override def stop() {