diff options
author | Timothy Chen <tnachen@gmail.com> | 2015-04-14 11:48:12 -0700 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2015-04-14 11:49:04 -0700 |
commit | 320bca4508e890b874c2eb7abb76a30ef14c932f (patch) | |
tree | f0edb3e3c3e917cc2a41954910fa05ef46277d56 /core | |
parent | 51b306b930cfe03ad21af72a3a6ef31e6e626235 (diff) | |
download | spark-320bca4508e890b874c2eb7abb76a30ef14c932f.tar.gz spark-320bca4508e890b874c2eb7abb76a30ef14c932f.tar.bz2 spark-320bca4508e890b874c2eb7abb76a30ef14c932f.zip |
[SPARK-6081] Support fetching http/https uris in driver runner.
Currently if passed uris such as http/https, it won't able to fetch them as it only calls HadoopFs get.
This fix utilizes the existing util method to fetch remote uris as well.
Author: Timothy Chen <tnachen@gmail.com>
Closes #4832 from tnachen/driver_remote and squashes the following commits:
aa52cd6 [Timothy Chen] Support fetching remote uris in driver runner.
Diffstat (limited to 'core')
4 files changed, 23 insertions, 15 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala index e0948e16ef..ef7a703bff 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala @@ -24,14 +24,14 @@ import scala.collection.JavaConversions._ import akka.actor.ActorRef import com.google.common.base.Charsets.UTF_8 import com.google.common.io.Files -import org.apache.hadoop.fs.{FileUtil, Path} +import org.apache.hadoop.fs.Path -import org.apache.spark.{Logging, SparkConf} +import org.apache.spark.{Logging, SparkConf, SecurityManager} import org.apache.spark.deploy.{DriverDescription, SparkHadoopUtil} import org.apache.spark.deploy.DeployMessages.DriverStateChanged import org.apache.spark.deploy.master.DriverState import org.apache.spark.deploy.master.DriverState.DriverState -import org.apache.spark.util.{Clock, SystemClock} +import org.apache.spark.util.{Utils, Clock, SystemClock} /** * Manages the execution of one driver, including automatically restarting the driver on failure. @@ -44,7 +44,8 @@ private[deploy] class DriverRunner( val sparkHome: File, val driverDesc: DriverDescription, val worker: ActorRef, - val workerUrl: String) + val workerUrl: String, + val securityManager: SecurityManager) extends Logging { @volatile private var process: Option[Process] = None @@ -136,12 +137,9 @@ private[deploy] class DriverRunner( * Will throw an exception if there are errors downloading the jar. */ private def downloadUserJar(driverDir: File): String = { - val jarPath = new Path(driverDesc.jarUrl) val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) - val jarFileSystem = jarPath.getFileSystem(hadoopConf) - val destPath = new File(driverDir.getAbsolutePath, jarPath.getName) val jarFileName = jarPath.getName val localJarFile = new File(driverDir, jarFileName) @@ -149,7 +147,14 @@ private[deploy] class DriverRunner( if (!localJarFile.exists()) { // May already exist if running multiple workers on one node logInfo(s"Copying user jar $jarPath to $destPath") - FileUtil.copy(jarFileSystem, jarPath, destPath, false, hadoopConf) + Utils.fetchFile( + driverDesc.jarUrl, + driverDir, + conf, + securityManager, + hadoopConf, + System.currentTimeMillis(), + useCache = false) } if (!localJarFile.exists()) { // Verify copy succeeded diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index c4c24a7866..3ee2eb69e8 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -436,7 +436,8 @@ private[worker] class Worker( sparkHome, driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)), self, - akkaUrl) + akkaUrl, + securityMgr) drivers(driverId) = driver driver.start() diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala index 2071701b31..b58d62567a 100644 --- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala @@ -28,7 +28,7 @@ import org.scalatest.FunSuite import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse} import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, RecoveryState, WorkerInfo} import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner} -import org.apache.spark.SparkConf +import org.apache.spark.{SecurityManager, SparkConf} class JsonProtocolSuite extends FunSuite { @@ -124,8 +124,9 @@ class JsonProtocolSuite extends FunSuite { } def createDriverRunner(): DriverRunner = { - new DriverRunner(new SparkConf(), "driverId", new File("workDir"), new File("sparkHome"), - createDriverDesc(), null, "akka://worker") + val conf = new SparkConf() + new DriverRunner(conf, "driverId", new File("workDir"), new File("sparkHome"), + createDriverDesc(), null, "akka://worker", new SecurityManager(conf)) } def assertValidJson(json: JValue) { diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala index aa6e4874ce..2159fd8c16 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala @@ -25,7 +25,7 @@ import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer import org.scalatest.FunSuite -import org.apache.spark.SparkConf +import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.deploy.{Command, DriverDescription} import org.apache.spark.util.Clock @@ -33,8 +33,9 @@ class DriverRunnerTest extends FunSuite { private def createDriverRunner() = { val command = new Command("mainClass", Seq(), Map(), Seq(), Seq(), Seq()) val driverDescription = new DriverDescription("jarUrl", 512, 1, true, command) - new DriverRunner(new SparkConf(), "driverId", new File("workDir"), new File("sparkHome"), - driverDescription, null, "akka://1.2.3.4/worker/") + val conf = new SparkConf() + new DriverRunner(conf, "driverId", new File("workDir"), new File("sparkHome"), + driverDescription, null, "akka://1.2.3.4/worker/", new SecurityManager(conf)) } private def createProcessBuilderAndProcess(): (ProcessBuilderLike, Process) = { |