aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorTimothy Chen <tnachen@gmail.com>2015-04-14 11:48:12 -0700
committerAndrew Or <andrew@databricks.com>2015-04-14 11:49:04 -0700
commit320bca4508e890b874c2eb7abb76a30ef14c932f (patch)
treef0edb3e3c3e917cc2a41954910fa05ef46277d56 /core
parent51b306b930cfe03ad21af72a3a6ef31e6e626235 (diff)
downloadspark-320bca4508e890b874c2eb7abb76a30ef14c932f.tar.gz
spark-320bca4508e890b874c2eb7abb76a30ef14c932f.tar.bz2
spark-320bca4508e890b874c2eb7abb76a30ef14c932f.zip
[SPARK-6081] Support fetching http/https uris in driver runner.
Currently if passed uris such as http/https, it won't able to fetch them as it only calls HadoopFs get. This fix utilizes the existing util method to fetch remote uris as well. Author: Timothy Chen <tnachen@gmail.com> Closes #4832 from tnachen/driver_remote and squashes the following commits: aa52cd6 [Timothy Chen] Support fetching remote uris in driver runner.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala21
-rwxr-xr-xcore/src/main/scala/org/apache/spark/deploy/worker/Worker.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala7
4 files changed, 23 insertions, 15 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
index e0948e16ef..ef7a703bff 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
@@ -24,14 +24,14 @@ import scala.collection.JavaConversions._
import akka.actor.ActorRef
import com.google.common.base.Charsets.UTF_8
import com.google.common.io.Files
-import org.apache.hadoop.fs.{FileUtil, Path}
+import org.apache.hadoop.fs.Path
-import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.{Logging, SparkConf, SecurityManager}
import org.apache.spark.deploy.{DriverDescription, SparkHadoopUtil}
import org.apache.spark.deploy.DeployMessages.DriverStateChanged
import org.apache.spark.deploy.master.DriverState
import org.apache.spark.deploy.master.DriverState.DriverState
-import org.apache.spark.util.{Clock, SystemClock}
+import org.apache.spark.util.{Utils, Clock, SystemClock}
/**
* Manages the execution of one driver, including automatically restarting the driver on failure.
@@ -44,7 +44,8 @@ private[deploy] class DriverRunner(
val sparkHome: File,
val driverDesc: DriverDescription,
val worker: ActorRef,
- val workerUrl: String)
+ val workerUrl: String,
+ val securityManager: SecurityManager)
extends Logging {
@volatile private var process: Option[Process] = None
@@ -136,12 +137,9 @@ private[deploy] class DriverRunner(
* Will throw an exception if there are errors downloading the jar.
*/
private def downloadUserJar(driverDir: File): String = {
-
val jarPath = new Path(driverDesc.jarUrl)
val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
- val jarFileSystem = jarPath.getFileSystem(hadoopConf)
-
val destPath = new File(driverDir.getAbsolutePath, jarPath.getName)
val jarFileName = jarPath.getName
val localJarFile = new File(driverDir, jarFileName)
@@ -149,7 +147,14 @@ private[deploy] class DriverRunner(
if (!localJarFile.exists()) { // May already exist if running multiple workers on one node
logInfo(s"Copying user jar $jarPath to $destPath")
- FileUtil.copy(jarFileSystem, jarPath, destPath, false, hadoopConf)
+ Utils.fetchFile(
+ driverDesc.jarUrl,
+ driverDir,
+ conf,
+ securityManager,
+ hadoopConf,
+ System.currentTimeMillis(),
+ useCache = false)
}
if (!localJarFile.exists()) { // Verify copy succeeded
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index c4c24a7866..3ee2eb69e8 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -436,7 +436,8 @@ private[worker] class Worker(
sparkHome,
driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
self,
- akkaUrl)
+ akkaUrl,
+ securityMgr)
drivers(driverId) = driver
driver.start()
diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
index 2071701b31..b58d62567a 100644
--- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
@@ -28,7 +28,7 @@ import org.scalatest.FunSuite
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse}
import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, RecoveryState, WorkerInfo}
import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner}
-import org.apache.spark.SparkConf
+import org.apache.spark.{SecurityManager, SparkConf}
class JsonProtocolSuite extends FunSuite {
@@ -124,8 +124,9 @@ class JsonProtocolSuite extends FunSuite {
}
def createDriverRunner(): DriverRunner = {
- new DriverRunner(new SparkConf(), "driverId", new File("workDir"), new File("sparkHome"),
- createDriverDesc(), null, "akka://worker")
+ val conf = new SparkConf()
+ new DriverRunner(conf, "driverId", new File("workDir"), new File("sparkHome"),
+ createDriverDesc(), null, "akka://worker", new SecurityManager(conf))
}
def assertValidJson(json: JValue) {
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
index aa6e4874ce..2159fd8c16 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
@@ -25,7 +25,7 @@ import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
import org.scalatest.FunSuite
-import org.apache.spark.SparkConf
+import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.deploy.{Command, DriverDescription}
import org.apache.spark.util.Clock
@@ -33,8 +33,9 @@ class DriverRunnerTest extends FunSuite {
private def createDriverRunner() = {
val command = new Command("mainClass", Seq(), Map(), Seq(), Seq(), Seq())
val driverDescription = new DriverDescription("jarUrl", 512, 1, true, command)
- new DriverRunner(new SparkConf(), "driverId", new File("workDir"), new File("sparkHome"),
- driverDescription, null, "akka://1.2.3.4/worker/")
+ val conf = new SparkConf()
+ new DriverRunner(conf, "driverId", new File("workDir"), new File("sparkHome"),
+ driverDescription, null, "akka://1.2.3.4/worker/", new SecurityManager(conf))
}
private def createProcessBuilderAndProcess(): (ProcessBuilderLike, Process) = {